2 Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
30 #endif /* HAVE_CONFIG_H */
32 #include <sys/types.h>
38 #include <corosync/corotypes.h>
39 #include <corosync/cpg.h>
42 #include "cfs-utils.h"
45 static cpg_callbacks_t cpg_callbacks
;
49 DFSM_MODE_START_SYNC
= 1,
53 /* values >= 128 indicates abnormal/error conditions */
54 DFSM_ERROR_MODE_START
= 128,
55 DFSM_MODE_LEAVE
= 253,
56 DFSM_MODE_VERSION_ERROR
= 254,
57 DFSM_MODE_ERROR
= 255,
61 DFSM_MESSAGE_NORMAL
= 0,
62 DFSM_MESSAGE_SYNC_START
= 1,
63 DFSM_MESSAGE_STATE
= 2,
64 DFSM_MESSAGE_UPDATE
= 3,
65 DFSM_MESSAGE_UPDATE_COMPLETE
= 4,
66 DFSM_MESSAGE_VERIFY_REQUEST
= 5,
67 DFSM_MESSAGE_VERIFY
= 6,
70 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
75 uint32_t protocol_version
;
78 } dfsm_message_header_t
;
81 uint32_t epoch
; // per process (not globally unique)
88 dfsm_message_header_t base
;
89 dfsm_sync_epoch_t epoch
;
90 } dfsm_message_state_header_t
;
93 dfsm_message_header_t base
;
95 } dfsm_message_normal_header_t
;
102 int msg_len
; // fixme: unsigned?
103 } dfsm_queued_message_t
;
106 const char *log_domain
;
107 cpg_callbacks_t
*cpg_callbacks
;
108 dfsm_callbacks_t
*dfsm_callbacks
;
109 cpg_handle_t cpg_handle
;
111 struct cpg_name cpg_group_name
;
116 guint32 protocol_version
;
121 /* mode is protected with mode_mutex */
125 GHashTable
*members
; /* contains dfsm_node_info_t pointers */
126 dfsm_sync_info_t
*sync_info
;
127 uint32_t local_epoch_counter
;
128 dfsm_sync_epoch_t sync_epoch
;
129 uint32_t lowest_nodeid
;
130 GSequence
*msg_queue
;
133 /* synchrounous message transmission, protected with sync_mutex */
138 uint64_t msgcount_rcvd
;
140 /* state verification */
142 dfsm_sync_epoch_t csum_epoch
;
144 uint64_t csum_counter
;
147 static gboolean
dfsm_deliver_queue(dfsm_t
*dfsm
);
148 static gboolean
dfsm_deliver_sync_queue(dfsm_t
*dfsm
);
151 dfsm_nodeid_is_local(
156 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
158 return (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
);
163 dfsm_send_sync_message_abort(dfsm_t
*dfsm
)
165 g_return_if_fail(dfsm
!= NULL
);
167 g_mutex_lock (&dfsm
->sync_mutex
);
168 dfsm
->msgcount_rcvd
= dfsm
->msgcount
;
169 g_cond_broadcast (&dfsm
->sync_cond
);
170 g_mutex_unlock (&dfsm
->sync_mutex
);
174 dfsm_record_local_result(
180 g_return_if_fail(dfsm
!= NULL
);
181 g_return_if_fail(dfsm
->results
!= NULL
);
183 g_mutex_lock (&dfsm
->sync_mutex
);
184 dfsm_result_t
*rp
= (dfsm_result_t
*)g_hash_table_lookup(dfsm
->results
, &msg_count
);
186 rp
->result
= msg_result
;
187 rp
->processed
= processed
;
189 dfsm
->msgcount_rcvd
= msg_count
;
190 g_cond_broadcast (&dfsm
->sync_cond
);
191 g_mutex_unlock (&dfsm
->sync_mutex
);
195 dfsm_send_message_full(
201 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
202 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
204 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
208 g_mutex_lock (&dfsm
->cpg_mutex
);
209 result
= cpg_mcast_joined(dfsm
->cpg_handle
, CPG_TYPE_AGREED
, iov
, len
);
210 g_mutex_unlock (&dfsm
->cpg_mutex
);
211 if (retry
&& result
== CS_ERR_TRY_AGAIN
) {
212 nanosleep(&tvreq
, NULL
);
214 if ((retries
% 10) == 0)
215 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retry %d", retries
);
221 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retried %d times", retries
);
223 if (result
!= CS_OK
&&
224 (!retry
|| result
!= CS_ERR_TRY_AGAIN
))
225 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
231 dfsm_send_state_message_full(
237 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
238 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type
), CS_ERR_INVALID_PARAM
);
239 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
241 dfsm_message_state_header_t header
;
242 header
.base
.type
= type
;
243 header
.base
.subtype
= 0;
244 header
.base
.protocol_version
= dfsm
->protocol_version
;
245 header
.base
.time
= time(NULL
);
246 header
.base
.reserved
= 0;
248 header
.epoch
= dfsm
->sync_epoch
;
250 struct iovec real_iov
[len
+ 1];
252 real_iov
[0].iov_base
= (char *)&header
;
253 real_iov
[0].iov_len
= sizeof(header
);
255 for (int i
= 0; i
< len
; i
++)
256 real_iov
[i
+ 1] = iov
[i
];
258 return dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
267 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE
, iov
, len
);
271 dfsm_send_update_complete(dfsm_t
*dfsm
)
273 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE_COMPLETE
, NULL
, 0);
284 return dfsm_send_message_sync(dfsm
, msgtype
, iov
, len
, NULL
);
288 dfsm_send_message_sync(
295 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
296 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
298 g_mutex_lock (&dfsm
->sync_mutex
);
299 /* note: hold lock until message is sent - to guarantee ordering */
300 uint64_t msgcount
= ++dfsm
->msgcount
;
302 rp
->msgcount
= msgcount
;
304 g_hash_table_replace(dfsm
->results
, &rp
->msgcount
, rp
);
307 dfsm_message_normal_header_t header
;
308 header
.base
.type
= DFSM_MESSAGE_NORMAL
;
309 header
.base
.subtype
= msgtype
;
310 header
.base
.protocol_version
= dfsm
->protocol_version
;
311 header
.base
.time
= time(NULL
);
312 header
.base
.reserved
= 0;
313 header
.count
= msgcount
;
315 struct iovec real_iov
[len
+ 1];
317 real_iov
[0].iov_base
= (char *)&header
;
318 real_iov
[0].iov_len
= sizeof(header
);
320 for (int i
= 0; i
< len
; i
++)
321 real_iov
[i
+ 1] = iov
[i
];
323 cs_error_t result
= dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
325 g_mutex_unlock (&dfsm
->sync_mutex
);
327 if (result
!= CS_OK
) {
328 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
331 g_mutex_lock (&dfsm
->sync_mutex
);
332 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
333 g_mutex_unlock (&dfsm
->sync_mutex
);
339 g_mutex_lock (&dfsm
->sync_mutex
);
341 while (dfsm
->msgcount_rcvd
< msgcount
)
342 g_cond_wait (&dfsm
->sync_cond
, &dfsm
->sync_mutex
);
345 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
347 g_mutex_unlock (&dfsm
->sync_mutex
);
349 return rp
->processed
? CS_OK
: CS_ERR_FAILED_OPERATION
;
356 dfsm_send_checksum(dfsm_t
*dfsm
)
358 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
361 struct iovec iov
[len
];
363 iov
[0].iov_base
= (char *)&dfsm
->csum_id
;
364 iov
[0].iov_len
= sizeof(dfsm
->csum_id
);
365 iov
[1].iov_base
= dfsm
->csum
;
366 iov
[1].iov_len
= sizeof(dfsm
->csum
);
368 gboolean res
= (dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY
, iov
, len
) == CS_OK
);
374 dfsm_free_queue_entry(gpointer data
)
376 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)data
;
382 dfsm_free_message_queue(dfsm_t
*dfsm
)
384 g_return_if_fail(dfsm
!= NULL
);
385 g_return_if_fail(dfsm
->msg_queue
!= NULL
);
387 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
388 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
389 while (iter
!= end
) {
390 GSequenceIter
*cur
= iter
;
391 iter
= g_sequence_iter_next(iter
);
392 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
394 dfsm_free_queue_entry(qm
);
395 g_sequence_remove(cur
);
400 dfsm_free_sync_queue(dfsm_t
*dfsm
)
402 g_return_if_fail(dfsm
!= NULL
);
404 GList
*iter
= dfsm
->sync_queue
;
406 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
407 iter
= g_list_next(iter
);
408 dfsm_free_queue_entry(qm
);
411 g_list_free(dfsm
->sync_queue
);
412 dfsm
->sync_queue
= NULL
;
416 message_queue_sort_fn(
421 return ((dfsm_queued_message_t
*)a
)->msg_count
-
422 ((dfsm_queued_message_t
*)b
)->msg_count
;
425 static dfsm_node_info_t
*
426 dfsm_node_info_lookup(
431 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
432 g_return_val_if_fail(dfsm
->members
!= NULL
, NULL
);
434 dfsm_node_info_t info
= { .nodeid
= nodeid
, .pid
= pid
};
436 return (dfsm_node_info_t
*)g_hash_table_lookup(dfsm
->members
, &info
);
439 static dfsm_queued_message_t
*
440 dfsm_queue_add_message(
448 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
449 g_return_val_if_fail(msg
!= NULL
, NULL
);
450 g_return_val_if_fail(msg_len
!= 0, NULL
);
452 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
);
454 cfs_dom_critical(dfsm
->log_domain
, "dfsm_node_info_lookup failed");
458 dfsm_queued_message_t
*qm
= g_new0(dfsm_queued_message_t
, 1);
459 g_return_val_if_fail(qm
!= NULL
, NULL
);
463 qm
->msg
= g_memdup2 (msg
, msg_len
);
464 qm
->msg_len
= msg_len
;
465 qm
->msg_count
= msg_count
;
467 if (dfsm
->mode
== DFSM_MODE_UPDATE
&& ni
->synced
) {
468 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
470 /* NOTE: we only need to sort the queue because we resend all
471 * queued messages sometimes.
473 g_sequence_insert_sorted(dfsm
->msg_queue
, qm
, message_queue_sort_fn
, NULL
);
480 dfsm_sync_info_hash(gconstpointer key
)
482 dfsm_node_info_t
*info
= (dfsm_node_info_t
*)key
;
484 return g_int_hash(&info
->nodeid
) + g_int_hash(&info
->pid
);
488 dfsm_sync_info_equal(
492 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
493 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
495 if (info1
->nodeid
== info2
->nodeid
&&
496 info1
->pid
== info2
->pid
)
503 dfsm_sync_info_compare(
507 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
508 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
510 if (info1
->nodeid
!= info2
->nodeid
)
511 return info1
->nodeid
- info2
->nodeid
;
513 return info1
->pid
- info2
->pid
;
519 dfsm_mode_t new_mode
)
521 g_return_if_fail(dfsm
!= NULL
);
523 cfs_debug("dfsm_set_mode - set mode to %d", new_mode
);
526 g_mutex_lock (&dfsm
->mode_mutex
);
527 if (dfsm
->mode
!= new_mode
) {
528 if (new_mode
< DFSM_ERROR_MODE_START
||
529 (dfsm
->mode
< DFSM_ERROR_MODE_START
|| new_mode
>= dfsm
->mode
)) {
530 dfsm
->mode
= new_mode
;
534 g_mutex_unlock (&dfsm
->mode_mutex
);
539 if (new_mode
== DFSM_MODE_START
) {
540 cfs_dom_message(dfsm
->log_domain
, "start cluster connection");
541 } else if (new_mode
== DFSM_MODE_START_SYNC
) {
542 cfs_dom_message(dfsm
->log_domain
, "starting data syncronisation");
543 } else if (new_mode
== DFSM_MODE_SYNCED
) {
544 cfs_dom_message(dfsm
->log_domain
, "all data is up to date");
545 if (dfsm
->dfsm_callbacks
->dfsm_synced_fn
)
546 dfsm
->dfsm_callbacks
->dfsm_synced_fn(dfsm
);
547 } else if (new_mode
== DFSM_MODE_UPDATE
) {
548 cfs_dom_message(dfsm
->log_domain
, "waiting for updates from leader");
549 } else if (new_mode
== DFSM_MODE_LEAVE
) {
550 cfs_dom_critical(dfsm
->log_domain
, "leaving CPG group");
551 } else if (new_mode
== DFSM_MODE_ERROR
) {
552 cfs_dom_critical(dfsm
->log_domain
, "serious internal error - stop cluster connection");
553 } else if (new_mode
== DFSM_MODE_VERSION_ERROR
) {
554 cfs_dom_critical(dfsm
->log_domain
, "detected newer protocol - please update this node");
559 dfsm_get_mode(dfsm_t
*dfsm
)
561 g_return_val_if_fail(dfsm
!= NULL
, DFSM_MODE_ERROR
);
563 g_mutex_lock (&dfsm
->mode_mutex
);
564 dfsm_mode_t mode
= dfsm
->mode
;
565 g_mutex_unlock (&dfsm
->mode_mutex
);
571 dfsm_restartable(dfsm_t
*dfsm
)
573 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
575 return !(mode
== DFSM_MODE_ERROR
||
576 mode
== DFSM_MODE_VERSION_ERROR
);
580 dfsm_set_errormode(dfsm_t
*dfsm
)
582 dfsm_set_mode(dfsm
, DFSM_MODE_ERROR
);
586 dfsm_release_sync_resources(
588 const struct cpg_address
*member_list
,
589 size_t member_list_entries
)
591 g_return_if_fail(dfsm
!= NULL
);
592 g_return_if_fail(dfsm
->members
!= NULL
);
593 g_return_if_fail(!member_list_entries
|| member_list
!= NULL
);
595 cfs_debug("enter dfsm_release_sync_resources");
597 if (dfsm
->sync_info
) {
599 if (dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
) {
600 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
601 dfsm
->sync_info
->data
= NULL
;
604 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
605 if (dfsm
->sync_info
->nodes
[i
].state
) {
606 g_free(dfsm
->sync_info
->nodes
[i
].state
);
607 dfsm
->sync_info
->nodes
[i
].state
= NULL
;
608 dfsm
->sync_info
->nodes
[i
].state_len
= 0;
615 g_hash_table_remove_all(dfsm
->members
);
618 g_free(dfsm
->sync_info
);
620 int size
= sizeof(dfsm_sync_info_t
) +
621 member_list_entries
*sizeof(dfsm_sync_info_t
);
622 dfsm_sync_info_t
*sync_info
= dfsm
->sync_info
= g_malloc0(size
);
623 sync_info
->node_count
= member_list_entries
;
625 for (int i
= 0; i
< member_list_entries
; i
++) {
626 sync_info
->nodes
[i
].nodeid
= member_list
[i
].nodeid
;
627 sync_info
->nodes
[i
].pid
= member_list
[i
].pid
;
630 qsort(sync_info
->nodes
, member_list_entries
, sizeof(dfsm_node_info_t
),
631 dfsm_sync_info_compare
);
633 for (int i
= 0; i
< member_list_entries
; i
++) {
634 dfsm_node_info_t
*info
= &sync_info
->nodes
[i
];
635 g_hash_table_insert(dfsm
->members
, info
, info
);
636 if (info
->nodeid
== dfsm
->nodeid
&& info
->pid
== dfsm
->pid
)
637 sync_info
->local
= info
;
643 dfsm_cpg_deliver_callback(
645 const struct cpg_name
*group_name
,
654 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
655 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
656 cfs_critical("cpg_context_get error: %d (%p)", result
, (void *) dfsm
);
657 return; /* we have no valid dfsm pointer, so we can just ignore this */
659 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
661 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
663 if (mode
>= DFSM_ERROR_MODE_START
) {
664 cfs_dom_debug(dfsm
->log_domain
, "error mode - ignoring message");
668 if (!dfsm
->sync_info
) {
669 cfs_dom_critical(dfsm
->log_domain
, "no dfsm_sync_info - internal error");
673 if (msg_len
< sizeof(dfsm_message_header_t
)) {
674 cfs_dom_critical(dfsm
->log_domain
, "received short message (%zd bytes)", msg_len
);
678 dfsm_message_header_t
*base_header
= (dfsm_message_header_t
*)msg
;
680 if (base_header
->protocol_version
> dfsm
->protocol_version
) {
681 cfs_dom_critical(dfsm
->log_domain
, "received message with protocol version %d",
682 base_header
->protocol_version
);
683 dfsm_set_mode(dfsm
, DFSM_MODE_VERSION_ERROR
);
685 } else if (base_header
->protocol_version
< dfsm
->protocol_version
) {
686 cfs_dom_message(dfsm
->log_domain
, "ignore message with wrong protocol version %d",
687 base_header
->protocol_version
);
691 if (base_header
->type
== DFSM_MESSAGE_NORMAL
) {
693 dfsm_message_normal_header_t
*header
= (dfsm_message_normal_header_t
*)msg
;
695 if (msg_len
< sizeof(dfsm_message_normal_header_t
)) {
696 cfs_dom_critical(dfsm
->log_domain
, "received short message (type = %d, subtype = %d, %zd bytes)",
697 base_header
->type
, base_header
->subtype
, msg_len
);
701 if (mode
!= DFSM_MODE_SYNCED
) {
702 cfs_dom_debug(dfsm
->log_domain
, "queue message %" PRIu64
" (subtype = %d, length = %zd)",
703 header
->count
, base_header
->subtype
, msg_len
);
705 if (!dfsm_queue_add_message(dfsm
, nodeid
, pid
, header
->count
, msg
, msg_len
))
710 int res
= dfsm
->dfsm_callbacks
->dfsm_deliver_fn(
711 dfsm
, dfsm
->data
, &msg_res
, nodeid
, pid
, base_header
->subtype
,
712 base_header
->time
, (uint8_t *)msg
+ sizeof(dfsm_message_normal_header_t
),
713 msg_len
- sizeof(dfsm_message_normal_header_t
));
715 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
)
716 dfsm_record_local_result(dfsm
, header
->count
, msg_res
, res
);
725 /* state related messages
726 * we needs right epoch - else we simply discard the message
729 dfsm_message_state_header_t
*header
= (dfsm_message_state_header_t
*)msg
;
731 if (msg_len
< sizeof(dfsm_message_state_header_t
)) {
732 cfs_dom_critical(dfsm
->log_domain
, "received short state message (type = %d, subtype = %d, %zd bytes)",
733 base_header
->type
, base_header
->subtype
, msg_len
);
737 if (base_header
->type
!= DFSM_MESSAGE_SYNC_START
&&
738 (memcmp(&header
->epoch
, &dfsm
->sync_epoch
, sizeof(dfsm_sync_epoch_t
)) != 0)) {
739 cfs_dom_debug(dfsm
->log_domain
, "ignore message (msg_type == %d) with "
740 "wrong epoch (epoch %d/%d/%08X)", base_header
->type
,
741 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
745 msg
= (uint8_t *) msg
+ sizeof(dfsm_message_state_header_t
);
746 msg_len
-= sizeof(dfsm_message_state_header_t
);
748 if (mode
== DFSM_MODE_SYNCED
) {
749 if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
751 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
752 dfsm
->sync_info
->nodes
[i
].synced
= 1;
754 if (!dfsm_deliver_queue(dfsm
))
759 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
) {
761 if (msg_len
!= sizeof(dfsm
->csum_counter
)) {
762 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len
, nodeid
, pid
);
766 uint64_t csum_id
= *((uint64_t *)msg
);
767 msg
= (uint8_t *) msg
+ 8; msg_len
-= 8;
769 cfs_dom_debug(dfsm
->log_domain
, "got verify request from node %d %016" PRIX64
, nodeid
, csum_id
);
771 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
772 if (!dfsm
->dfsm_callbacks
->dfsm_checksum_fn(
773 dfsm
, dfsm
->data
, dfsm
->csum
, sizeof(dfsm
->csum
))) {
774 cfs_dom_critical(dfsm
->log_domain
, "unable to compute data checksum");
778 dfsm
->csum_epoch
= header
->epoch
;
779 dfsm
->csum_id
= csum_id
;
781 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
) {
782 if (!dfsm_send_checksum(dfsm
))
789 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY
) {
791 cfs_dom_debug(dfsm
->log_domain
, "received verify message");
793 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
795 if (msg_len
!= (sizeof(dfsm
->csum_id
) + sizeof(dfsm
->csum
))) {
796 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify message with wrong length (%zd bytes)", msg_len
);
800 uint64_t csum_id
= *((uint64_t *)msg
);
801 msg
= (uint8_t *) msg
+ 8; msg_len
-= 8;
803 if (dfsm
->csum_id
== csum_id
&&
804 (memcmp(&dfsm
->csum_epoch
, &header
->epoch
, sizeof(dfsm_sync_epoch_t
)) == 0)) {
805 if (memcmp(msg
, dfsm
->csum
, sizeof(dfsm
->csum
)) != 0) {
806 cfs_dom_critical(dfsm
->log_domain
, "wrong checksum %016" PRIX64
" != %016" PRIX64
" - restarting",
807 *(uint64_t *)msg
, *(uint64_t *)dfsm
->csum
);
810 cfs_dom_message(dfsm
->log_domain
, "data verification successful");
813 cfs_dom_message(dfsm
->log_domain
, "skip verification - no checksum saved");
820 /* ignore (we already got all required updates, or we are leader) */
821 cfs_dom_debug(dfsm
->log_domain
, "ignore state sync message %d",
826 } else if (mode
== DFSM_MODE_START_SYNC
) {
828 if (base_header
->type
== DFSM_MESSAGE_SYNC_START
) {
830 if (nodeid
!= dfsm
->lowest_nodeid
) {
831 cfs_dom_critical(dfsm
->log_domain
, "ignore sync request from wrong member %d/%d",
835 cfs_dom_message(dfsm
->log_domain
, "received sync request (epoch %d/%d/%08X)",
836 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
838 dfsm
->sync_epoch
= header
->epoch
;
840 dfsm_release_sync_resources(dfsm
, NULL
, 0);
842 unsigned int state_len
= 0;
843 gpointer state
= NULL
;
845 state
= dfsm
->dfsm_callbacks
->dfsm_get_state_fn(dfsm
, dfsm
->data
, &state_len
);
847 if (!(state
&& state_len
)) {
848 cfs_dom_critical(dfsm
->log_domain
, "dfsm_get_state_fn failed");
853 iov
[0].iov_base
= state
;
854 iov
[0].iov_len
= state_len
;
856 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_STATE
, iov
, 1);
866 } else if (base_header
->type
== DFSM_MESSAGE_STATE
) {
868 dfsm_node_info_t
*ni
;
870 if (!(ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
))) {
871 cfs_dom_critical(dfsm
->log_domain
, "received state for non-member %d/%d", nodeid
, pid
);
876 cfs_dom_critical(dfsm
->log_domain
, "received duplicate state for member %d/%d", nodeid
, pid
);
880 ni
->state
= g_memdup2(msg
, msg_len
);
881 ni
->state_len
= msg_len
;
883 int received_all
= 1;
884 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
885 if (!dfsm
->sync_info
->nodes
[i
].state
) {
892 cfs_dom_message(dfsm
->log_domain
, "received all states");
894 int res
= dfsm
->dfsm_callbacks
->dfsm_process_state_update_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
898 if (dfsm
->sync_info
->local
->synced
) {
899 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
900 dfsm_release_sync_resources(dfsm
, NULL
, 0);
902 if (!dfsm_deliver_queue(dfsm
))
906 dfsm_set_mode(dfsm
, DFSM_MODE_UPDATE
);
908 if (!dfsm_deliver_queue(dfsm
))
917 } else if (mode
== DFSM_MODE_UPDATE
) {
919 if (base_header
->type
== DFSM_MESSAGE_UPDATE
) {
921 int res
= dfsm
->dfsm_callbacks
->dfsm_process_update_fn(
922 dfsm
, dfsm
->data
, dfsm
->sync_info
, nodeid
, pid
, msg
, msg_len
);
929 } else if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
932 int res
= dfsm
->dfsm_callbacks
->dfsm_commit_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
937 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
938 dfsm
->sync_info
->nodes
[i
].synced
= 1;
940 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
942 if (!dfsm_deliver_sync_queue(dfsm
))
945 if (!dfsm_deliver_queue(dfsm
))
948 dfsm_release_sync_resources(dfsm
, NULL
, 0);
954 cfs_dom_critical(dfsm
->log_domain
, "internal error - unknown mode %d", mode
);
958 if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
||
959 base_header
->type
== DFSM_MESSAGE_VERIFY
) {
961 cfs_dom_debug(dfsm
->log_domain
, "ignore verify message %d while not synced", base_header
->type
);
964 cfs_dom_critical(dfsm
->log_domain
, "received unknown state message type (type = %d, %zd bytes)",
965 base_header
->type
, msg_len
);
970 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
971 dfsm_release_sync_resources(dfsm
, NULL
, 0);
976 dfsm_resend_queue(dfsm_t
*dfsm
)
978 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
979 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
981 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
982 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
985 while (iter
!= end
) {
986 GSequenceIter
*cur
= iter
;
987 iter
= g_sequence_iter_next(iter
);
989 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
992 if (qm
->nodeid
== dfsm
->nodeid
&& qm
->pid
== dfsm
->pid
) {
995 iov
[0].iov_base
= qm
->msg
;
996 iov
[0].iov_len
= qm
->msg_len
;
998 if ((result
= dfsm_send_message_full(dfsm
, iov
, 1, 1)) != CS_OK
) {
1005 dfsm_free_message_queue(dfsm
);
1011 dfsm_deliver_sync_queue(dfsm_t
*dfsm
)
1013 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1015 if (!dfsm
->sync_queue
)
1018 gboolean res
= TRUE
;
1021 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
,
1022 g_list_length(dfsm
->sync_queue
));
1024 GList
*iter
= dfsm
->sync_queue
;
1026 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
1027 iter
= g_list_next(iter
);
1029 if (res
&& dfsm
->mode
== DFSM_MODE_SYNCED
) {
1030 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1031 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1036 dfsm_free_queue_entry(qm
);
1038 g_list_free(dfsm
->sync_queue
);
1039 dfsm
->sync_queue
= NULL
;
1045 dfsm_deliver_queue(dfsm_t
*dfsm
)
1047 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1048 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
1050 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1054 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
1055 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
1056 gboolean res
= TRUE
;
1059 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
, qlen
);
1061 while (iter
!= end
) {
1062 GSequenceIter
*cur
= iter
;
1063 iter
= g_sequence_iter_next(iter
);
1065 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
1066 g_sequence_get(cur
);
1068 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, qm
->nodeid
, qm
->pid
);
1070 cfs_dom_message(dfsm
->log_domain
, "remove message from non-member %d/%d",
1071 qm
->nodeid
, qm
->pid
);
1072 dfsm_free_queue_entry(qm
);
1073 g_sequence_remove(cur
);
1077 if (dfsm
->mode
== DFSM_MODE_SYNCED
) {
1079 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1080 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1081 dfsm_free_queue_entry(qm
);
1082 g_sequence_remove(cur
);
1084 } else if (dfsm
->mode
== DFSM_MODE_UPDATE
) {
1086 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
1087 g_sequence_remove(cur
);
1099 dfsm_cpg_confchg_callback(
1100 cpg_handle_t handle
,
1101 const struct cpg_name
*group_name
,
1102 const struct cpg_address
*member_list
,
1103 size_t member_list_entries
,
1104 const struct cpg_address
*left_list
,
1105 size_t left_list_entries
,
1106 const struct cpg_address
*joined_list
,
1107 size_t joined_list_entries
)
1111 dfsm_t
*dfsm
= NULL
;
1112 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
1113 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
1114 cfs_critical("cpg_context_get error: %d (%p)", result
, (void *) dfsm
);
1115 return; /* we have no valid dfsm pointer, so we can just ignore this */
1118 dfsm
->we_are_member
= 0;
1120 /* create new epoch */
1121 dfsm
->local_epoch_counter
++;
1122 dfsm
->sync_epoch
.epoch
= dfsm
->local_epoch_counter
;
1123 dfsm
->sync_epoch
.nodeid
= dfsm
->nodeid
;
1124 dfsm
->sync_epoch
.pid
= dfsm
->pid
;
1125 dfsm
->sync_epoch
.time
= time(NULL
);
1127 /* invalidate saved checksum */
1128 dfsm
->csum_id
= dfsm
->csum_counter
;
1129 memset(&dfsm
->csum_epoch
, 0, sizeof(dfsm
->csum_epoch
));
1131 dfsm_free_sync_queue(dfsm
);
1133 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1135 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
1137 if (mode
>= DFSM_ERROR_MODE_START
) {
1138 cfs_dom_debug(dfsm
->log_domain
, "already left group - ignore message");
1142 int lowest_nodeid
= 0;
1143 GString
*member_ids
= g_string_new(NULL
);
1144 for (int i
= 0; i
< member_list_entries
; i
++) {
1146 g_string_append_printf(member_ids
, i
? ", %d/%d" : "%d/%d",
1147 member_list
[i
].nodeid
, member_list
[i
].pid
);
1149 if (lowest_nodeid
== 0 || lowest_nodeid
> member_list
[i
].nodeid
)
1150 lowest_nodeid
= member_list
[i
].nodeid
;
1152 if (member_list
[i
].nodeid
== dfsm
->nodeid
&&
1153 member_list
[i
].pid
== dfsm
->pid
)
1154 dfsm
->we_are_member
= 1;
1158 if ((dfsm
->we_are_member
|| mode
!= DFSM_MODE_START
))
1159 cfs_dom_message(dfsm
->log_domain
, "members: %s", member_ids
->str
);
1161 g_string_free(member_ids
, 1);
1163 dfsm
->lowest_nodeid
= lowest_nodeid
;
1165 /* NOTE: one node can be in left and joined list at the same time,
1166 so it is better to query member list. Also JOIN/LEAVE list are
1167 different on different nodes!
1170 dfsm_release_sync_resources(dfsm
, member_list
, member_list_entries
);
1172 if (!dfsm
->we_are_member
) {
1173 if (mode
== DFSM_MODE_START
) {
1174 cfs_dom_debug(dfsm
->log_domain
, "ignore leave message");
1177 cfs_dom_message(dfsm
->log_domain
, "we (%d/%d) left the process group",
1178 dfsm
->nodeid
, dfsm
->pid
);
1182 if (member_list_entries
> 1) {
1184 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1185 if (joined_list_entries
&& qlen
) {
1186 /* we need to make sure that all members have the same queue. */
1187 cfs_dom_message(dfsm
->log_domain
, "queue not emtpy - resening %d messages", qlen
);
1188 if (!dfsm_resend_queue(dfsm
)) {
1189 cfs_dom_critical(dfsm
->log_domain
, "dfsm_resend_queue failed");
1194 dfsm_set_mode(dfsm
, DFSM_MODE_START_SYNC
);
1195 if (lowest_nodeid
== dfsm
->nodeid
) {
1196 if (dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_SYNC_START
, NULL
, 0) != CS_OK
) {
1197 cfs_dom_critical(dfsm
->log_domain
, "failed to send SYNC_START message");
1202 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
1203 dfsm
->sync_info
->local
->synced
= 1;
1204 if (!dfsm_deliver_queue(dfsm
))
1208 if (dfsm
->dfsm_callbacks
->dfsm_confchg_fn
)
1209 dfsm
->dfsm_callbacks
->dfsm_confchg_fn(dfsm
, dfsm
->data
, member_list
, member_list_entries
);
1213 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
1217 static cpg_callbacks_t cpg_callbacks
= {
1218 .cpg_deliver_fn
= dfsm_cpg_deliver_callback
,
1219 .cpg_confchg_fn
= dfsm_cpg_confchg_callback
,
1225 const char *group_name
,
1226 const char *log_domain
,
1227 guint32 protocol_version
,
1228 dfsm_callbacks_t
*callbacks
)
1230 g_return_val_if_fail(sizeof(dfsm_message_header_t
) == 16, NULL
);
1231 g_return_val_if_fail(sizeof(dfsm_message_state_header_t
) == 32, NULL
);
1232 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t
) == 24, NULL
);
1234 g_return_val_if_fail(callbacks
!= NULL
, NULL
);
1235 g_return_val_if_fail(callbacks
->dfsm_deliver_fn
!= NULL
, NULL
);
1237 g_return_val_if_fail(callbacks
->dfsm_get_state_fn
!= NULL
, NULL
);
1238 g_return_val_if_fail(callbacks
->dfsm_process_state_update_fn
!= NULL
, NULL
);
1239 g_return_val_if_fail(callbacks
->dfsm_process_update_fn
!= NULL
, NULL
);
1240 g_return_val_if_fail(callbacks
->dfsm_commit_fn
!= NULL
, NULL
);
1244 if ((dfsm
= g_new0(dfsm_t
, 1)) == NULL
)
1247 g_mutex_init(&dfsm
->sync_mutex
);
1249 g_cond_init(&dfsm
->sync_cond
);
1251 if (!(dfsm
->results
= g_hash_table_new(g_int64_hash
, g_int64_equal
)))
1254 if (!(dfsm
->msg_queue
= g_sequence_new(NULL
)))
1257 g_mutex_init(&dfsm
->cpg_mutex
);
1259 dfsm
->log_domain
= log_domain
;
1261 dfsm
->mode
= DFSM_MODE_START
;
1262 dfsm
->protocol_version
= protocol_version
;
1263 strcpy (dfsm
->cpg_group_name
.value
, group_name
);
1264 dfsm
->cpg_group_name
.length
= strlen (group_name
) + 1;
1266 dfsm
->cpg_callbacks
= &cpg_callbacks
;
1267 dfsm
->dfsm_callbacks
= callbacks
;
1269 dfsm
->members
= g_hash_table_new(dfsm_sync_info_hash
, dfsm_sync_info_equal
);
1273 g_mutex_init(&dfsm
->mode_mutex
);
1283 dfsm_is_initialized(dfsm_t
*dfsm
)
1285 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1287 return (dfsm
->cpg_handle
!= 0) ? TRUE
: FALSE
;
1291 dfsm_lowest_nodeid(dfsm_t
*dfsm
)
1293 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1295 if (dfsm
->lowest_nodeid
&& (dfsm
->lowest_nodeid
== dfsm
->nodeid
))
1302 dfsm_verify_request(dfsm_t
*dfsm
)
1304 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1306 /* only do when we have lowest nodeid */
1307 if (!dfsm
->lowest_nodeid
|| (dfsm
->lowest_nodeid
!= dfsm
->nodeid
))
1310 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1311 if (mode
!= DFSM_MODE_SYNCED
)
1315 struct iovec iov
[len
];
1317 if (dfsm
->csum_counter
!= dfsm
->csum_id
) {
1318 g_message("delay verify request %016" PRIX64
, dfsm
->csum_counter
+ 1);
1322 dfsm
->csum_counter
++;
1323 iov
[0].iov_base
= (char *)&dfsm
->csum_counter
;
1324 iov
[0].iov_len
= sizeof(dfsm
->csum_counter
);
1326 cfs_debug("send verify request %016" PRIX64
, dfsm
->csum_counter
);
1329 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY_REQUEST
, iov
, len
);
1331 if (result
!= CS_OK
)
1332 cfs_dom_critical(dfsm
->log_domain
, "failed to send VERIFY_REQUEST message");
1341 cs_dispatch_flags_t dispatch_types
)
1343 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1344 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_INVALID_PARAM
);
1348 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1351 result
= cpg_dispatch(dfsm
->cpg_handle
, dispatch_types
);
1352 if (result
== CS_ERR_TRY_AGAIN
) {
1353 nanosleep(&tvreq
, NULL
);
1355 if ((retries
% 10) == 0)
1356 cfs_dom_message(dfsm
->log_domain
, "cpg_dispatch retry %d", retries
);
1360 if (!(result
== CS_OK
|| result
== CS_ERR_TRY_AGAIN
)) {
1361 cfs_dom_critical(dfsm
->log_domain
, "cpg_dispatch failed: %d", result
);
1369 dfsm_initialize(dfsm_t
*dfsm
, int *fd
)
1371 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1372 g_return_val_if_fail(fd
!= NULL
, CS_ERR_INVALID_PARAM
);
1374 /* remove old messages */
1375 dfsm_free_message_queue(dfsm
);
1376 dfsm_send_sync_message_abort(dfsm
);
1378 dfsm
->joined
= FALSE
;
1379 dfsm
->we_are_member
= 0;
1381 dfsm_set_mode(dfsm
, DFSM_MODE_START
);
1385 if (dfsm
->cpg_handle
== 0) {
1386 if ((result
= cpg_initialize(&dfsm
->cpg_handle
, dfsm
->cpg_callbacks
)) != CS_OK
) {
1387 cfs_dom_critical(dfsm
->log_domain
, "cpg_initialize failed: %d", result
);
1388 goto err_no_finalize
;
1391 if ((result
= cpg_local_get(dfsm
->cpg_handle
, &dfsm
->nodeid
)) != CS_OK
) {
1392 cfs_dom_critical(dfsm
->log_domain
, "cpg_local_get failed: %d", result
);
1396 dfsm
->pid
= getpid();
1398 result
= cpg_context_set(dfsm
->cpg_handle
, dfsm
);
1399 if (result
!= CS_OK
) {
1400 cfs_dom_critical(dfsm
->log_domain
, "cpg_context_set failed: %d", result
);
1405 result
= cpg_fd_get(dfsm
->cpg_handle
, fd
);
1406 if (result
!= CS_OK
) {
1407 cfs_dom_critical(dfsm
->log_domain
, "cpg_fd_get failed: %d", result
);
1414 cpg_finalize(dfsm
->cpg_handle
);
1416 dfsm
->cpg_handle
= 0;
1421 dfsm_join(dfsm_t
*dfsm
)
1423 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1424 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_LIBRARY
);
1425 g_return_val_if_fail(dfsm
->joined
== 0, CS_ERR_EXIST
);
1429 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1432 g_mutex_lock (&dfsm
->cpg_mutex
);
1433 result
= cpg_join(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1434 g_mutex_unlock (&dfsm
->cpg_mutex
);
1435 if (result
== CS_ERR_TRY_AGAIN
) {
1436 nanosleep(&tvreq
, NULL
);
1438 if ((retries
% 10) == 0)
1439 cfs_dom_message(dfsm
->log_domain
, "cpg_join retry %d", retries
);
1443 if (result
!= CS_OK
) {
1444 cfs_dom_critical(dfsm
->log_domain
, "cpg_join failed: %d", result
);
1448 dfsm
->joined
= TRUE
;
1453 dfsm_leave (dfsm_t
*dfsm
)
1455 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1456 g_return_val_if_fail(dfsm
->joined
, CS_ERR_NOT_EXIST
);
1460 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1463 g_mutex_lock (&dfsm
->cpg_mutex
);
1464 result
= cpg_leave(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1465 g_mutex_unlock (&dfsm
->cpg_mutex
);
1466 if (result
== CS_ERR_TRY_AGAIN
) {
1467 nanosleep(&tvreq
, NULL
);
1469 if ((retries
% 10) == 0)
1470 cfs_dom_message(dfsm
->log_domain
, "cpg_leave retry %d", retries
);
1474 if (result
!= CS_OK
) {
1475 cfs_dom_critical(dfsm
->log_domain
, "cpg_leave failed: %d", result
);
1479 dfsm
->joined
= FALSE
;
1485 dfsm_finalize(dfsm_t
*dfsm
)
1487 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1489 dfsm_send_sync_message_abort(dfsm
);
1494 if (dfsm
->cpg_handle
) {
1495 cpg_finalize(dfsm
->cpg_handle
);
1496 dfsm
->cpg_handle
= 0;
1497 dfsm
->joined
= FALSE
;
1498 dfsm
->we_are_member
= 0;
1505 dfsm_destroy(dfsm_t
*dfsm
)
1507 g_return_if_fail(dfsm
!= NULL
);
1509 dfsm_finalize(dfsm
);
1511 if (dfsm
->sync_info
&& dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
)
1512 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
1514 dfsm_free_sync_queue(dfsm
);
1516 g_mutex_clear (&dfsm
->mode_mutex
);
1518 g_mutex_clear (&dfsm
->sync_mutex
);
1520 g_cond_clear (&dfsm
->sync_cond
);
1522 g_mutex_clear (&dfsm
->cpg_mutex
);
1525 g_hash_table_destroy(dfsm
->results
);
1527 if (dfsm
->msg_queue
) {
1528 dfsm_free_message_queue(dfsm
);
1529 g_sequence_free(dfsm
->msg_queue
);
1532 if (dfsm
->sync_info
)
1533 g_free(dfsm
->sync_info
);
1535 if (dfsm
->cpg_handle
)
1536 cpg_finalize(dfsm
->cpg_handle
);
1539 g_hash_table_destroy(dfsm
->members
);
1546 } service_dfsm_private_t
;
1549 service_dfsm_finalize(
1550 cfs_service_t
*service
,
1553 g_return_val_if_fail(service
!= NULL
, FALSE
);
1554 g_return_val_if_fail(context
!= NULL
, FALSE
);
1556 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1557 dfsm_t
*dfsm
= private->dfsm
;
1559 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1561 return dfsm_finalize(dfsm
);
1565 service_dfsm_initialize(
1566 cfs_service_t
*service
,
1569 g_return_val_if_fail(service
!= NULL
, -1);
1570 g_return_val_if_fail(context
!= NULL
, -1);
1572 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1573 dfsm_t
*dfsm
= private->dfsm
;
1575 g_return_val_if_fail(dfsm
!= NULL
, -1);
1577 /* serious internal error - don't try to recover */
1578 if (!dfsm_restartable(dfsm
))
1584 if ((result
= dfsm_initialize(dfsm
, &fd
)) != CS_OK
)
1587 result
= dfsm_join(dfsm
);
1588 if (result
!= CS_OK
) {
1589 /* we can't dispatch if not joined, so we need to finalize */
1590 dfsm_finalize(dfsm
);
1598 service_dfsm_dispatch(
1599 cfs_service_t
*service
,
1602 g_return_val_if_fail(service
!= NULL
, FALSE
);
1603 g_return_val_if_fail(context
!= NULL
, FALSE
);
1605 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1606 dfsm_t
*dfsm
= private->dfsm
;
1608 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1609 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, FALSE
);
1613 result
= dfsm_dispatch(dfsm
, CS_DISPATCH_ONE
);
1614 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1616 if (result
!= CS_OK
)
1619 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1620 if (mode
>= DFSM_ERROR_MODE_START
) {
1622 result
= dfsm_leave(dfsm
);
1623 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1625 if (result
!= CS_OK
)
1628 if (!dfsm
->we_are_member
)
1636 dfsm_finalize(dfsm
);
1638 cfs_service_set_restartable(service
, dfsm_restartable(dfsm
));
1644 cfs_service_t
*service
,
1647 g_return_if_fail(service
!= NULL
);
1648 g_return_if_fail(context
!= NULL
);
1650 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1651 dfsm_t
*dfsm
= private->dfsm
;
1653 g_return_if_fail(dfsm
!= NULL
);
1655 dfsm_verify_request(dfsm
);
1658 static cfs_service_callbacks_t cfs_dfsm_callbacks
= {
1659 .cfs_service_initialize_fn
= service_dfsm_initialize
,
1660 .cfs_service_finalize_fn
= service_dfsm_finalize
,
1661 .cfs_service_dispatch_fn
= service_dfsm_dispatch
,
1662 .cfs_service_timer_fn
= service_dfsm_timer
,
1666 service_dfsm_new(dfsm_t
*dfsm
)
1668 cfs_service_t
*service
;
1670 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
1672 service_dfsm_private_t
*private = g_new0(service_dfsm_private_t
, 1);
1676 private->dfsm
= dfsm
;
1678 service
= cfs_service_new(&cfs_dfsm_callbacks
, dfsm
->log_domain
, private);
1684 service_dfsm_destroy(cfs_service_t
*service
)
1686 g_return_if_fail(service
!= NULL
);
1688 service_dfsm_private_t
*private =
1689 (service_dfsm_private_t
*)cfs_service_get_context(service
);