2 Copyright (C) 2010 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
30 #endif /* HAVE_CONFIG_H */
32 #include <sys/types.h>
38 #include <corosync/corotypes.h>
39 #include <corosync/cpg.h>
42 #include "cfs-utils.h"
45 static cpg_callbacks_t cpg_callbacks
;
49 DFSM_MODE_START_SYNC
= 1,
53 /* values >= 128 indicates abnormal/error conditions */
54 DFSM_ERROR_MODE_START
= 128,
55 DFSM_MODE_LEAVE
= 253,
56 DFSM_MODE_VERSION_ERROR
= 254,
57 DFSM_MODE_ERROR
= 255,
61 DFSM_MESSAGE_NORMAL
= 0,
62 DFSM_MESSAGE_SYNC_START
= 1,
63 DFSM_MESSAGE_STATE
= 2,
64 DFSM_MESSAGE_UPDATE
= 3,
65 DFSM_MESSAGE_UPDATE_COMPLETE
= 4,
66 DFSM_MESSAGE_VERIFY_REQUEST
= 5,
67 DFSM_MESSAGE_VERIFY
= 6,
70 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
75 uint32_t protocol_version
;
78 } dfsm_message_header_t
;
81 uint32_t epoch
; // per process (not globally unique)
88 dfsm_message_header_t base
;
89 dfsm_sync_epoch_t epoch
;
90 } dfsm_message_state_header_t
;
93 dfsm_message_header_t base
;
95 } dfsm_message_normal_header_t
;
102 int msg_len
; // fixme: unsigned?
103 } dfsm_queued_message_t
;
106 const char *log_domain
;
107 cpg_callbacks_t
*cpg_callbacks
;
108 dfsm_callbacks_t
*dfsm_callbacks
;
109 cpg_handle_t cpg_handle
;
110 struct cpg_name cpg_group_name
;
115 guint32 protocol_version
;
120 /* mode is protected with mode_mutex */
124 GHashTable
*members
; /* contains dfsm_node_info_t pointers */
125 dfsm_sync_info_t
*sync_info
;
126 uint32_t local_epoch_counter
;
127 dfsm_sync_epoch_t sync_epoch
;
128 uint32_t lowest_nodeid
;
129 GSequence
*msg_queue
;
132 /* synchrounous message transmission, protected with sync_mutex */
137 uint64_t msgcount_rcvd
;
139 /* state verification */
141 dfsm_sync_epoch_t csum_epoch
;
143 uint64_t csum_counter
;
146 static gboolean
dfsm_deliver_queue(dfsm_t
*dfsm
);
147 static gboolean
dfsm_deliver_sync_queue(dfsm_t
*dfsm
);
150 dfsm_nodeid_is_local(
155 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
157 return (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
);
162 dfsm_send_sync_message_abort(dfsm_t
*dfsm
)
164 g_return_if_fail(dfsm
!= NULL
);
166 g_mutex_lock (&dfsm
->sync_mutex
);
167 dfsm
->msgcount_rcvd
= dfsm
->msgcount
;
168 g_cond_broadcast (&dfsm
->sync_cond
);
169 g_mutex_unlock (&dfsm
->sync_mutex
);
173 dfsm_record_local_result(
179 g_return_if_fail(dfsm
!= NULL
);
180 g_return_if_fail(dfsm
->results
!= NULL
);
182 g_mutex_lock (&dfsm
->sync_mutex
);
183 dfsm_result_t
*rp
= (dfsm_result_t
*)g_hash_table_lookup(dfsm
->results
, &msg_count
);
185 rp
->result
= msg_result
;
186 rp
->processed
= processed
;
188 dfsm
->msgcount_rcvd
= msg_count
;
189 g_cond_broadcast (&dfsm
->sync_cond
);
190 g_mutex_unlock (&dfsm
->sync_mutex
);
194 dfsm_send_message_full(
200 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
201 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
203 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
207 result
= cpg_mcast_joined(dfsm
->cpg_handle
, CPG_TYPE_AGREED
, iov
, len
);
208 if (retry
&& result
== CS_ERR_TRY_AGAIN
) {
209 nanosleep(&tvreq
, NULL
);
211 if ((retries
% 10) == 0)
212 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retry %d", retries
);
218 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retried %d times", retries
);
220 if (result
!= CS_OK
&&
221 (!retry
|| result
!= CS_ERR_TRY_AGAIN
))
222 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
228 dfsm_send_state_message_full(
234 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
235 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type
), CS_ERR_INVALID_PARAM
);
236 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
238 dfsm_message_state_header_t header
;
239 header
.base
.type
= type
;
240 header
.base
.subtype
= 0;
241 header
.base
.protocol_version
= dfsm
->protocol_version
;
242 header
.base
.time
= time(NULL
);
243 header
.base
.reserved
= 0;
245 header
.epoch
= dfsm
->sync_epoch
;
247 struct iovec real_iov
[len
+ 1];
249 real_iov
[0].iov_base
= (char *)&header
;
250 real_iov
[0].iov_len
= sizeof(header
);
252 for (int i
= 0; i
< len
; i
++)
253 real_iov
[i
+ 1] = iov
[i
];
255 return dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
264 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE
, iov
, len
);
268 dfsm_send_update_complete(dfsm_t
*dfsm
)
270 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE_COMPLETE
, NULL
, 0);
281 return dfsm_send_message_sync(dfsm
, msgtype
, iov
, len
, NULL
);
285 dfsm_send_message_sync(
292 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
293 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
295 g_mutex_lock (&dfsm
->sync_mutex
);
296 /* note: hold lock until message is sent - to guarantee ordering */
297 uint64_t msgcount
= ++dfsm
->msgcount
;
299 rp
->msgcount
= msgcount
;
301 g_hash_table_replace(dfsm
->results
, &rp
->msgcount
, rp
);
304 dfsm_message_normal_header_t header
;
305 header
.base
.type
= DFSM_MESSAGE_NORMAL
;
306 header
.base
.subtype
= msgtype
;
307 header
.base
.protocol_version
= dfsm
->protocol_version
;
308 header
.base
.time
= time(NULL
);
309 header
.base
.reserved
= 0;
310 header
.count
= msgcount
;
312 struct iovec real_iov
[len
+ 1];
314 real_iov
[0].iov_base
= (char *)&header
;
315 real_iov
[0].iov_len
= sizeof(header
);
317 for (int i
= 0; i
< len
; i
++)
318 real_iov
[i
+ 1] = iov
[i
];
320 cs_error_t result
= dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
322 g_mutex_unlock (&dfsm
->sync_mutex
);
324 if (result
!= CS_OK
) {
325 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
328 g_mutex_lock (&dfsm
->sync_mutex
);
329 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
330 g_mutex_unlock (&dfsm
->sync_mutex
);
336 g_mutex_lock (&dfsm
->sync_mutex
);
338 while (dfsm
->msgcount_rcvd
< msgcount
)
339 g_cond_wait (&dfsm
->sync_cond
, &dfsm
->sync_mutex
);
342 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
344 g_mutex_unlock (&dfsm
->sync_mutex
);
346 return rp
->processed
? CS_OK
: CS_ERR_FAILED_OPERATION
;
353 dfsm_send_checksum(dfsm_t
*dfsm
)
355 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
358 struct iovec iov
[len
];
360 iov
[0].iov_base
= (char *)&dfsm
->csum_id
;
361 iov
[0].iov_len
= sizeof(dfsm
->csum_id
);
362 iov
[1].iov_base
= dfsm
->csum
;
363 iov
[1].iov_len
= sizeof(dfsm
->csum
);
365 gboolean res
= (dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY
, iov
, len
) == CS_OK
);
371 dfsm_free_queue_entry(gpointer data
)
373 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)data
;
379 dfsm_free_message_queue(dfsm_t
*dfsm
)
381 g_return_if_fail(dfsm
!= NULL
);
382 g_return_if_fail(dfsm
->msg_queue
!= NULL
);
384 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
385 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
386 while (iter
!= end
) {
387 GSequenceIter
*cur
= iter
;
388 iter
= g_sequence_iter_next(iter
);
389 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
391 dfsm_free_queue_entry(qm
);
392 g_sequence_remove(cur
);
397 dfsm_free_sync_queue(dfsm_t
*dfsm
)
399 g_return_if_fail(dfsm
!= NULL
);
401 GList
*iter
= dfsm
->sync_queue
;
403 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
404 iter
= g_list_next(iter
);
405 dfsm_free_queue_entry(qm
);
408 g_list_free(dfsm
->sync_queue
);
409 dfsm
->sync_queue
= NULL
;
413 message_queue_sort_fn(
418 return ((dfsm_queued_message_t
*)a
)->msg_count
-
419 ((dfsm_queued_message_t
*)b
)->msg_count
;
422 static dfsm_node_info_t
*
423 dfsm_node_info_lookup(
428 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
429 g_return_val_if_fail(dfsm
->members
!= NULL
, NULL
);
431 dfsm_node_info_t info
= { .nodeid
= nodeid
, .pid
= pid
};
433 return (dfsm_node_info_t
*)g_hash_table_lookup(dfsm
->members
, &info
);
436 static dfsm_queued_message_t
*
437 dfsm_queue_add_message(
445 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
446 g_return_val_if_fail(msg
!= NULL
, NULL
);
447 g_return_val_if_fail(msg_len
!= 0, NULL
);
449 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
);
451 cfs_dom_critical(dfsm
->log_domain
, "dfsm_node_info_lookup failed");
455 dfsm_queued_message_t
*qm
= g_new0(dfsm_queued_message_t
, 1);
456 g_return_val_if_fail(qm
!= NULL
, NULL
);
460 qm
->msg
= g_memdup (msg
, msg_len
);
461 qm
->msg_len
= msg_len
;
462 qm
->msg_count
= msg_count
;
464 if (dfsm
->mode
== DFSM_MODE_UPDATE
&& ni
->synced
) {
465 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
467 /* NOTE: we only need to sort the queue because we resend all
468 * queued messages sometimes.
470 g_sequence_insert_sorted(dfsm
->msg_queue
, qm
, message_queue_sort_fn
, NULL
);
477 dfsm_sync_info_hash(gconstpointer key
)
479 dfsm_node_info_t
*info
= (dfsm_node_info_t
*)key
;
481 return g_int_hash(&info
->nodeid
) + g_int_hash(&info
->pid
);
485 dfsm_sync_info_equal(
489 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
490 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
492 if (info1
->nodeid
== info2
->nodeid
&&
493 info1
->pid
== info2
->pid
)
500 dfsm_sync_info_compare(
504 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
505 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
507 if (info1
->nodeid
!= info2
->nodeid
)
508 return info1
->nodeid
- info2
->nodeid
;
510 return info1
->pid
- info2
->pid
;
516 dfsm_mode_t new_mode
)
518 g_return_if_fail(dfsm
!= NULL
);
520 cfs_debug("dfsm_set_mode - set mode to %d", new_mode
);
523 g_mutex_lock (&dfsm
->mode_mutex
);
524 if (dfsm
->mode
!= new_mode
) {
525 if (new_mode
< DFSM_ERROR_MODE_START
||
526 (dfsm
->mode
< DFSM_ERROR_MODE_START
|| new_mode
>= dfsm
->mode
)) {
527 dfsm
->mode
= new_mode
;
531 g_mutex_unlock (&dfsm
->mode_mutex
);
536 if (new_mode
== DFSM_MODE_START
) {
537 cfs_dom_message(dfsm
->log_domain
, "start cluster connection");
538 } else if (new_mode
== DFSM_MODE_START_SYNC
) {
539 cfs_dom_message(dfsm
->log_domain
, "starting data syncronisation");
540 } else if (new_mode
== DFSM_MODE_SYNCED
) {
541 cfs_dom_message(dfsm
->log_domain
, "all data is up to date");
542 if (dfsm
->dfsm_callbacks
->dfsm_synced_fn
)
543 dfsm
->dfsm_callbacks
->dfsm_synced_fn(dfsm
);
544 } else if (new_mode
== DFSM_MODE_UPDATE
) {
545 cfs_dom_message(dfsm
->log_domain
, "waiting for updates from leader");
546 } else if (new_mode
== DFSM_MODE_LEAVE
) {
547 cfs_dom_critical(dfsm
->log_domain
, "leaving CPG group");
548 } else if (new_mode
== DFSM_MODE_ERROR
) {
549 cfs_dom_critical(dfsm
->log_domain
, "serious internal error - stop cluster connection");
550 } else if (new_mode
== DFSM_MODE_VERSION_ERROR
) {
551 cfs_dom_critical(dfsm
->log_domain
, "detected newer protocol - please update this node");
556 dfsm_get_mode(dfsm_t
*dfsm
)
558 g_return_val_if_fail(dfsm
!= NULL
, DFSM_MODE_ERROR
);
560 g_mutex_lock (&dfsm
->mode_mutex
);
561 dfsm_mode_t mode
= dfsm
->mode
;
562 g_mutex_unlock (&dfsm
->mode_mutex
);
568 dfsm_restartable(dfsm_t
*dfsm
)
570 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
572 return !(mode
== DFSM_MODE_ERROR
||
573 mode
== DFSM_MODE_VERSION_ERROR
);
577 dfsm_set_errormode(dfsm_t
*dfsm
)
579 dfsm_set_mode(dfsm
, DFSM_MODE_ERROR
);
583 dfsm_release_sync_resources(
585 const struct cpg_address
*member_list
,
586 size_t member_list_entries
)
588 g_return_if_fail(dfsm
!= NULL
);
589 g_return_if_fail(dfsm
->members
!= NULL
);
590 g_return_if_fail(!member_list_entries
|| member_list
!= NULL
);
592 cfs_debug("enter dfsm_release_sync_resources");
594 if (dfsm
->sync_info
) {
596 if (dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
) {
597 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
598 dfsm
->sync_info
->data
= NULL
;
601 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
602 if (dfsm
->sync_info
->nodes
[i
].state
) {
603 g_free(dfsm
->sync_info
->nodes
[i
].state
);
604 dfsm
->sync_info
->nodes
[i
].state
= NULL
;
605 dfsm
->sync_info
->nodes
[i
].state_len
= 0;
612 g_hash_table_remove_all(dfsm
->members
);
615 g_free(dfsm
->sync_info
);
617 int size
= sizeof(dfsm_sync_info_t
) +
618 member_list_entries
*sizeof(dfsm_sync_info_t
);
619 dfsm_sync_info_t
*sync_info
= dfsm
->sync_info
= g_malloc0(size
);
620 sync_info
->node_count
= member_list_entries
;
622 for (int i
= 0; i
< member_list_entries
; i
++) {
623 sync_info
->nodes
[i
].nodeid
= member_list
[i
].nodeid
;
624 sync_info
->nodes
[i
].pid
= member_list
[i
].pid
;
627 qsort(sync_info
->nodes
, member_list_entries
, sizeof(dfsm_node_info_t
),
628 dfsm_sync_info_compare
);
630 for (int i
= 0; i
< member_list_entries
; i
++) {
631 dfsm_node_info_t
*info
= &sync_info
->nodes
[i
];
632 g_hash_table_insert(dfsm
->members
, info
, info
);
633 if (info
->nodeid
== dfsm
->nodeid
&& info
->pid
== dfsm
->pid
)
634 sync_info
->local
= info
;
640 dfsm_cpg_deliver_callback(
642 const struct cpg_name
*group_name
,
651 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
652 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
653 cfs_critical("cpg_context_get error: %d (%p)", result
, (void *) dfsm
);
654 return; /* we have no valid dfsm pointer, so we can just ignore this */
656 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
658 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
660 if (mode
>= DFSM_ERROR_MODE_START
) {
661 cfs_dom_debug(dfsm
->log_domain
, "error mode - ignoring message");
665 if (!dfsm
->sync_info
) {
666 cfs_dom_critical(dfsm
->log_domain
, "no dfsm_sync_info - internal error");
670 if (msg_len
< sizeof(dfsm_message_header_t
)) {
671 cfs_dom_critical(dfsm
->log_domain
, "received short message (%zd bytes)", msg_len
);
675 dfsm_message_header_t
*base_header
= (dfsm_message_header_t
*)msg
;
677 if (base_header
->protocol_version
> dfsm
->protocol_version
) {
678 cfs_dom_critical(dfsm
->log_domain
, "received message with protocol version %d",
679 base_header
->protocol_version
);
680 dfsm_set_mode(dfsm
, DFSM_MODE_VERSION_ERROR
);
682 } else if (base_header
->protocol_version
< dfsm
->protocol_version
) {
683 cfs_dom_message(dfsm
->log_domain
, "ignore message with wrong protocol version %d",
684 base_header
->protocol_version
);
688 if (base_header
->type
== DFSM_MESSAGE_NORMAL
) {
690 dfsm_message_normal_header_t
*header
= (dfsm_message_normal_header_t
*)msg
;
692 if (msg_len
< sizeof(dfsm_message_normal_header_t
)) {
693 cfs_dom_critical(dfsm
->log_domain
, "received short message (type = %d, subtype = %d, %zd bytes)",
694 base_header
->type
, base_header
->subtype
, msg_len
);
698 if (mode
!= DFSM_MODE_SYNCED
) {
699 cfs_dom_debug(dfsm
->log_domain
, "queue message %" PRIu64
" (subtype = %d, length = %zd)",
700 header
->count
, base_header
->subtype
, msg_len
);
702 if (!dfsm_queue_add_message(dfsm
, nodeid
, pid
, header
->count
, msg
, msg_len
))
707 int res
= dfsm
->dfsm_callbacks
->dfsm_deliver_fn(
708 dfsm
, dfsm
->data
, &msg_res
, nodeid
, pid
, base_header
->subtype
,
709 base_header
->time
, msg
+ sizeof(dfsm_message_normal_header_t
),
710 msg_len
- sizeof(dfsm_message_normal_header_t
));
712 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
)
713 dfsm_record_local_result(dfsm
, header
->count
, msg_res
, res
);
722 /* state related messages
723 * we needs right epoch - else we simply discard the message
726 dfsm_message_state_header_t
*header
= (dfsm_message_state_header_t
*)msg
;
728 if (msg_len
< sizeof(dfsm_message_state_header_t
)) {
729 cfs_dom_critical(dfsm
->log_domain
, "received short state message (type = %d, subtype = %d, %zd bytes)",
730 base_header
->type
, base_header
->subtype
, msg_len
);
734 if (base_header
->type
!= DFSM_MESSAGE_SYNC_START
&&
735 (memcmp(&header
->epoch
, &dfsm
->sync_epoch
, sizeof(dfsm_sync_epoch_t
)) != 0)) {
736 cfs_dom_debug(dfsm
->log_domain
, "ignore message (msg_type == %d) with "
737 "wrong epoch (epoch %d/%d/%08X)", base_header
->type
,
738 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
742 msg
+= sizeof(dfsm_message_state_header_t
);
743 msg_len
-= sizeof(dfsm_message_state_header_t
);
745 if (mode
== DFSM_MODE_SYNCED
) {
746 if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
748 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
749 dfsm
->sync_info
->nodes
[i
].synced
= 1;
751 if (!dfsm_deliver_queue(dfsm
))
756 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
) {
758 if (msg_len
!= sizeof(dfsm
->csum_counter
)) {
759 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len
, nodeid
, pid
);
763 uint64_t csum_id
= *((uint64_t *)msg
);
764 msg
+= 8; msg_len
-= 8;
766 cfs_dom_debug(dfsm
->log_domain
, "got verify request from node %d %016" PRIX64
, nodeid
, csum_id
);
768 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
769 if (!dfsm
->dfsm_callbacks
->dfsm_checksum_fn(
770 dfsm
, dfsm
->data
, dfsm
->csum
, sizeof(dfsm
->csum
))) {
771 cfs_dom_critical(dfsm
->log_domain
, "unable to compute data checksum");
775 dfsm
->csum_epoch
= header
->epoch
;
776 dfsm
->csum_id
= csum_id
;
778 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
) {
779 if (!dfsm_send_checksum(dfsm
))
786 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY
) {
788 cfs_dom_debug(dfsm
->log_domain
, "received verify message");
790 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
792 if (msg_len
!= (sizeof(dfsm
->csum_id
) + sizeof(dfsm
->csum
))) {
793 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify message with wrong length (%zd bytes)", msg_len
);
797 uint64_t csum_id
= *((uint64_t *)msg
);
798 msg
+= 8; msg_len
-= 8;
800 if (dfsm
->csum_id
== csum_id
&&
801 (memcmp(&dfsm
->csum_epoch
, &header
->epoch
, sizeof(dfsm_sync_epoch_t
)) == 0)) {
802 if (memcmp(msg
, dfsm
->csum
, sizeof(dfsm
->csum
)) != 0) {
803 cfs_dom_critical(dfsm
->log_domain
, "wrong checksum %016" PRIX64
" != %016" PRIX64
" - restarting",
804 *(uint64_t *)msg
, *(uint64_t *)dfsm
->csum
);
807 cfs_dom_message(dfsm
->log_domain
, "data verification successful");
810 cfs_dom_message(dfsm
->log_domain
, "skip verification - no checksum saved");
817 /* ignore (we already got all required updates, or we are leader) */
818 cfs_dom_debug(dfsm
->log_domain
, "ignore state sync message %d",
823 } else if (mode
== DFSM_MODE_START_SYNC
) {
825 if (base_header
->type
== DFSM_MESSAGE_SYNC_START
) {
827 if (nodeid
!= dfsm
->lowest_nodeid
) {
828 cfs_dom_critical(dfsm
->log_domain
, "ignore sync request from wrong member %d/%d",
832 cfs_dom_message(dfsm
->log_domain
, "received sync request (epoch %d/%d/%08X)",
833 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
835 dfsm
->sync_epoch
= header
->epoch
;
837 dfsm_release_sync_resources(dfsm
, NULL
, 0);
839 unsigned int state_len
= 0;
840 gpointer state
= NULL
;
842 state
= dfsm
->dfsm_callbacks
->dfsm_get_state_fn(dfsm
, dfsm
->data
, &state_len
);
844 if (!(state
&& state_len
)) {
845 cfs_dom_critical(dfsm
->log_domain
, "dfsm_get_state_fn failed");
850 iov
[0].iov_base
= state
;
851 iov
[0].iov_len
= state_len
;
853 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_STATE
, iov
, 1);
863 } else if (base_header
->type
== DFSM_MESSAGE_STATE
) {
865 dfsm_node_info_t
*ni
;
867 if (!(ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
))) {
868 cfs_dom_critical(dfsm
->log_domain
, "received state for non-member %d/%d", nodeid
, pid
);
873 cfs_dom_critical(dfsm
->log_domain
, "received duplicate state for member %d/%d", nodeid
, pid
);
877 ni
->state
= g_memdup(msg
, msg_len
);
878 ni
->state_len
= msg_len
;
880 int received_all
= 1;
881 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
882 if (!dfsm
->sync_info
->nodes
[i
].state
) {
889 cfs_dom_message(dfsm
->log_domain
, "received all states");
891 int res
= dfsm
->dfsm_callbacks
->dfsm_process_state_update_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
895 if (dfsm
->sync_info
->local
->synced
) {
896 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
897 dfsm_release_sync_resources(dfsm
, NULL
, 0);
899 if (!dfsm_deliver_queue(dfsm
))
903 dfsm_set_mode(dfsm
, DFSM_MODE_UPDATE
);
905 if (!dfsm_deliver_queue(dfsm
))
914 } else if (mode
== DFSM_MODE_UPDATE
) {
916 if (base_header
->type
== DFSM_MESSAGE_UPDATE
) {
918 int res
= dfsm
->dfsm_callbacks
->dfsm_process_update_fn(
919 dfsm
, dfsm
->data
, dfsm
->sync_info
, nodeid
, pid
, msg
, msg_len
);
926 } else if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
929 int res
= dfsm
->dfsm_callbacks
->dfsm_commit_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
934 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
935 dfsm
->sync_info
->nodes
[i
].synced
= 1;
937 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
939 if (!dfsm_deliver_sync_queue(dfsm
))
942 if (!dfsm_deliver_queue(dfsm
))
945 dfsm_release_sync_resources(dfsm
, NULL
, 0);
951 cfs_dom_critical(dfsm
->log_domain
, "internal error - unknown mode %d", mode
);
955 if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
||
956 base_header
->type
== DFSM_MESSAGE_VERIFY
) {
958 cfs_dom_debug(dfsm
->log_domain
, "ignore verify message %d while not synced", base_header
->type
);
961 cfs_dom_critical(dfsm
->log_domain
, "received unknown state message type (type = %d, %zd bytes)",
962 base_header
->type
, msg_len
);
967 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
968 dfsm_release_sync_resources(dfsm
, NULL
, 0);
973 dfsm_resend_queue(dfsm_t
*dfsm
)
975 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
976 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
978 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
979 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
982 while (iter
!= end
) {
983 GSequenceIter
*cur
= iter
;
984 iter
= g_sequence_iter_next(iter
);
986 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
989 if (qm
->nodeid
== dfsm
->nodeid
&& qm
->pid
== dfsm
->pid
) {
992 iov
[0].iov_base
= qm
->msg
;
993 iov
[0].iov_len
= qm
->msg_len
;
995 if ((result
= dfsm_send_message_full(dfsm
, iov
, 1, 1)) != CS_OK
) {
1002 dfsm_free_message_queue(dfsm
);
1008 dfsm_deliver_sync_queue(dfsm_t
*dfsm
)
1010 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1012 if (!dfsm
->sync_queue
)
1015 gboolean res
= TRUE
;
1018 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
,
1019 g_list_length(dfsm
->sync_queue
));
1021 GList
*iter
= dfsm
->sync_queue
;
1023 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
1024 iter
= g_list_next(iter
);
1026 if (res
&& dfsm
->mode
== DFSM_MODE_SYNCED
) {
1027 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1028 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1033 dfsm_free_queue_entry(qm
);
1035 g_list_free(dfsm
->sync_queue
);
1036 dfsm
->sync_queue
= NULL
;
1042 dfsm_deliver_queue(dfsm_t
*dfsm
)
1044 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1045 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
1047 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1051 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
1052 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
1053 gboolean res
= TRUE
;
1056 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
, qlen
);
1058 while (iter
!= end
) {
1059 GSequenceIter
*cur
= iter
;
1060 iter
= g_sequence_iter_next(iter
);
1062 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
1063 g_sequence_get(cur
);
1065 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, qm
->nodeid
, qm
->pid
);
1067 cfs_dom_message(dfsm
->log_domain
, "remove message from non-member %d/%d",
1068 qm
->nodeid
, qm
->pid
);
1069 dfsm_free_queue_entry(qm
);
1070 g_sequence_remove(cur
);
1074 if (dfsm
->mode
== DFSM_MODE_SYNCED
) {
1076 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1077 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1078 dfsm_free_queue_entry(qm
);
1079 g_sequence_remove(cur
);
1081 } else if (dfsm
->mode
== DFSM_MODE_UPDATE
) {
1083 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
1084 g_sequence_remove(cur
);
1096 dfsm_cpg_confchg_callback(
1097 cpg_handle_t handle
,
1098 const struct cpg_name
*group_name
,
1099 const struct cpg_address
*member_list
,
1100 size_t member_list_entries
,
1101 const struct cpg_address
*left_list
,
1102 size_t left_list_entries
,
1103 const struct cpg_address
*joined_list
,
1104 size_t joined_list_entries
)
1108 dfsm_t
*dfsm
= NULL
;
1109 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
1110 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
1111 cfs_critical("cpg_context_get error: %d (%p)", result
, (void *) dfsm
);
1112 return; /* we have no valid dfsm pointer, so we can just ignore this */
1115 dfsm
->we_are_member
= 0;
1117 /* create new epoch */
1118 dfsm
->local_epoch_counter
++;
1119 dfsm
->sync_epoch
.epoch
= dfsm
->local_epoch_counter
;
1120 dfsm
->sync_epoch
.nodeid
= dfsm
->nodeid
;
1121 dfsm
->sync_epoch
.pid
= dfsm
->pid
;
1122 dfsm
->sync_epoch
.time
= time(NULL
);
1124 /* invalidate saved checksum */
1125 dfsm
->csum_id
= dfsm
->csum_counter
;
1126 memset(&dfsm
->csum_epoch
, 0, sizeof(dfsm
->csum_epoch
));
1128 dfsm_free_sync_queue(dfsm
);
1130 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1132 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
1134 if (mode
>= DFSM_ERROR_MODE_START
) {
1135 cfs_dom_debug(dfsm
->log_domain
, "already left group - ignore message");
1139 int lowest_nodeid
= 0;
1140 GString
*member_ids
= g_string_new(NULL
);
1141 for (int i
= 0; i
< member_list_entries
; i
++) {
1143 g_string_append_printf(member_ids
, i
? ", %d/%d" : "%d/%d",
1144 member_list
[i
].nodeid
, member_list
[i
].pid
);
1146 if (lowest_nodeid
== 0 || lowest_nodeid
> member_list
[i
].nodeid
)
1147 lowest_nodeid
= member_list
[i
].nodeid
;
1149 if (member_list
[i
].nodeid
== dfsm
->nodeid
&&
1150 member_list
[i
].pid
== dfsm
->pid
)
1151 dfsm
->we_are_member
= 1;
1155 if ((dfsm
->we_are_member
|| mode
!= DFSM_MODE_START
))
1156 cfs_dom_message(dfsm
->log_domain
, "members: %s", member_ids
->str
);
1158 g_string_free(member_ids
, 1);
1160 dfsm
->lowest_nodeid
= lowest_nodeid
;
1162 /* NOTE: one node can be in left and joined list at the same time,
1163 so it is better to query member list. Also JOIN/LEAVE list are
1164 different on different nodes!
1167 dfsm_release_sync_resources(dfsm
, member_list
, member_list_entries
);
1169 if (!dfsm
->we_are_member
) {
1170 if (mode
== DFSM_MODE_START
) {
1171 cfs_dom_debug(dfsm
->log_domain
, "ignore leave message");
1174 cfs_dom_message(dfsm
->log_domain
, "we (%d/%d) left the process group",
1175 dfsm
->nodeid
, dfsm
->pid
);
1179 if (member_list_entries
> 1) {
1181 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1182 if (joined_list_entries
&& qlen
) {
1183 /* we need to make sure that all members have the same queue. */
1184 cfs_dom_message(dfsm
->log_domain
, "queue not emtpy - resening %d messages", qlen
);
1185 if (!dfsm_resend_queue(dfsm
)) {
1186 cfs_dom_critical(dfsm
->log_domain
, "dfsm_resend_queue failed");
1191 dfsm_set_mode(dfsm
, DFSM_MODE_START_SYNC
);
1192 if (lowest_nodeid
== dfsm
->nodeid
) {
1193 if (!dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_SYNC_START
, NULL
, 0)) {
1194 cfs_dom_critical(dfsm
->log_domain
, "failed to send SYNC_START message");
1199 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
1200 dfsm
->sync_info
->local
->synced
= 1;
1201 if (!dfsm_deliver_queue(dfsm
))
1205 if (dfsm
->dfsm_callbacks
->dfsm_confchg_fn
)
1206 dfsm
->dfsm_callbacks
->dfsm_confchg_fn(dfsm
, dfsm
->data
, member_list
, member_list_entries
);
1210 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
1214 static cpg_callbacks_t cpg_callbacks
= {
1215 .cpg_deliver_fn
= dfsm_cpg_deliver_callback
,
1216 .cpg_confchg_fn
= dfsm_cpg_confchg_callback
,
1222 const char *group_name
,
1223 const char *log_domain
,
1224 guint32 protocol_version
,
1225 dfsm_callbacks_t
*callbacks
)
1227 g_return_val_if_fail(sizeof(dfsm_message_header_t
) == 16, NULL
);
1228 g_return_val_if_fail(sizeof(dfsm_message_state_header_t
) == 32, NULL
);
1229 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t
) == 24, NULL
);
1231 g_return_val_if_fail(callbacks
!= NULL
, NULL
);
1232 g_return_val_if_fail(callbacks
->dfsm_deliver_fn
!= NULL
, NULL
);
1234 g_return_val_if_fail(callbacks
->dfsm_get_state_fn
!= NULL
, NULL
);
1235 g_return_val_if_fail(callbacks
->dfsm_process_state_update_fn
!= NULL
, NULL
);
1236 g_return_val_if_fail(callbacks
->dfsm_process_update_fn
!= NULL
, NULL
);
1237 g_return_val_if_fail(callbacks
->dfsm_commit_fn
!= NULL
, NULL
);
1241 if ((dfsm
= g_new0(dfsm_t
, 1)) == NULL
)
1244 g_mutex_init(&dfsm
->sync_mutex
);
1246 g_cond_init(&dfsm
->sync_cond
);
1248 if (!(dfsm
->results
= g_hash_table_new(g_int64_hash
, g_int64_equal
)))
1251 if (!(dfsm
->msg_queue
= g_sequence_new(NULL
)))
1254 dfsm
->log_domain
= log_domain
;
1256 dfsm
->mode
= DFSM_MODE_START
;
1257 dfsm
->protocol_version
= protocol_version
;
1258 strcpy (dfsm
->cpg_group_name
.value
, group_name
);
1259 dfsm
->cpg_group_name
.length
= strlen (group_name
) + 1;
1261 dfsm
->cpg_callbacks
= &cpg_callbacks
;
1262 dfsm
->dfsm_callbacks
= callbacks
;
1264 dfsm
->members
= g_hash_table_new(dfsm_sync_info_hash
, dfsm_sync_info_equal
);
1268 g_mutex_init(&dfsm
->mode_mutex
);
1278 dfsm_is_initialized(dfsm_t
*dfsm
)
1280 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1282 return (dfsm
->cpg_handle
!= 0) ? TRUE
: FALSE
;
1286 dfsm_lowest_nodeid(dfsm_t
*dfsm
)
1288 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1290 if (dfsm
->lowest_nodeid
&& (dfsm
->lowest_nodeid
== dfsm
->nodeid
))
1297 dfsm_verify_request(dfsm_t
*dfsm
)
1299 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1301 /* only do when we have lowest nodeid */
1302 if (!dfsm
->lowest_nodeid
|| (dfsm
->lowest_nodeid
!= dfsm
->nodeid
))
1305 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1306 if (mode
!= DFSM_MODE_SYNCED
)
1310 struct iovec iov
[len
];
1312 if (dfsm
->csum_counter
!= dfsm
->csum_id
) {
1313 g_message("delay verify request %016" PRIX64
, dfsm
->csum_counter
+ 1);
1317 dfsm
->csum_counter
++;
1318 iov
[0].iov_base
= (char *)&dfsm
->csum_counter
;
1319 iov
[0].iov_len
= sizeof(dfsm
->csum_counter
);
1321 cfs_debug("send verify request %016" PRIX64
, dfsm
->csum_counter
);
1324 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY_REQUEST
, iov
, len
);
1326 if (result
!= CS_OK
)
1327 cfs_dom_critical(dfsm
->log_domain
, "failed to send VERIFY_REQUEST message");
1336 cs_dispatch_flags_t dispatch_types
)
1338 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1339 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_INVALID_PARAM
);
1343 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1346 result
= cpg_dispatch(dfsm
->cpg_handle
, dispatch_types
);
1347 if (result
== CS_ERR_TRY_AGAIN
) {
1348 nanosleep(&tvreq
, NULL
);
1350 if ((retries
% 10) == 0)
1351 cfs_dom_message(dfsm
->log_domain
, "cpg_dispatch retry %d", retries
);
1355 if (!(result
== CS_OK
|| result
== CS_ERR_TRY_AGAIN
)) {
1356 cfs_dom_critical(dfsm
->log_domain
, "cpg_dispatch failed: %d", result
);
1364 dfsm_initialize(dfsm_t
*dfsm
, int *fd
)
1366 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1367 g_return_val_if_fail(fd
!= NULL
, CS_ERR_INVALID_PARAM
);
1369 /* remove old messages */
1370 dfsm_free_message_queue(dfsm
);
1371 dfsm_send_sync_message_abort(dfsm
);
1373 dfsm
->joined
= FALSE
;
1374 dfsm
->we_are_member
= 0;
1376 dfsm_set_mode(dfsm
, DFSM_MODE_START
);
1380 if (dfsm
->cpg_handle
== 0) {
1381 if ((result
= cpg_initialize(&dfsm
->cpg_handle
, dfsm
->cpg_callbacks
)) != CS_OK
) {
1382 cfs_dom_critical(dfsm
->log_domain
, "cpg_initialize failed: %d", result
);
1383 goto err_no_finalize
;
1386 if ((result
= cpg_local_get(dfsm
->cpg_handle
, &dfsm
->nodeid
)) != CS_OK
) {
1387 cfs_dom_critical(dfsm
->log_domain
, "cpg_local_get failed: %d", result
);
1391 dfsm
->pid
= getpid();
1393 result
= cpg_context_set(dfsm
->cpg_handle
, dfsm
);
1394 if (result
!= CS_OK
) {
1395 cfs_dom_critical(dfsm
->log_domain
, "cpg_context_set failed: %d", result
);
1400 result
= cpg_fd_get(dfsm
->cpg_handle
, fd
);
1401 if (result
!= CS_OK
) {
1402 cfs_dom_critical(dfsm
->log_domain
, "cpg_fd_get failed: %d", result
);
1409 cpg_finalize(dfsm
->cpg_handle
);
1411 dfsm
->cpg_handle
= 0;
1416 dfsm_join(dfsm_t
*dfsm
)
1418 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1419 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_LIBRARY
);
1420 g_return_val_if_fail(dfsm
->joined
== 0, CS_ERR_EXIST
);
1424 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1427 result
= cpg_join(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1428 if (result
== CS_ERR_TRY_AGAIN
) {
1429 nanosleep(&tvreq
, NULL
);
1431 if ((retries
% 10) == 0)
1432 cfs_dom_message(dfsm
->log_domain
, "cpg_join retry %d", retries
);
1436 if (result
!= CS_OK
) {
1437 cfs_dom_critical(dfsm
->log_domain
, "cpg_join failed: %d", result
);
1441 dfsm
->joined
= TRUE
;
1446 dfsm_leave (dfsm_t
*dfsm
)
1448 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1449 g_return_val_if_fail(dfsm
->joined
, CS_ERR_NOT_EXIST
);
1453 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1456 result
= cpg_leave(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1457 if (result
== CS_ERR_TRY_AGAIN
) {
1458 nanosleep(&tvreq
, NULL
);
1460 if ((retries
% 10) == 0)
1461 cfs_dom_message(dfsm
->log_domain
, "cpg_leave retry %d", retries
);
1465 if (result
!= CS_OK
) {
1466 cfs_dom_critical(dfsm
->log_domain
, "cpg_leave failed: %d", result
);
1470 dfsm
->joined
= FALSE
;
1476 dfsm_finalize(dfsm_t
*dfsm
)
1478 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1480 dfsm_send_sync_message_abort(dfsm
);
1485 if (dfsm
->cpg_handle
) {
1486 cpg_finalize(dfsm
->cpg_handle
);
1487 dfsm
->cpg_handle
= 0;
1488 dfsm
->joined
= FALSE
;
1489 dfsm
->we_are_member
= 0;
1496 dfsm_destroy(dfsm_t
*dfsm
)
1498 g_return_if_fail(dfsm
!= NULL
);
1500 dfsm_finalize(dfsm
);
1502 if (dfsm
->sync_info
&& dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
)
1503 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
1505 dfsm_free_sync_queue(dfsm
);
1507 g_mutex_clear (&dfsm
->mode_mutex
);
1509 g_mutex_clear (&dfsm
->sync_mutex
);
1511 g_cond_clear (&dfsm
->sync_cond
);
1514 g_hash_table_destroy(dfsm
->results
);
1516 if (dfsm
->msg_queue
) {
1517 dfsm_free_message_queue(dfsm
);
1518 g_sequence_free(dfsm
->msg_queue
);
1521 if (dfsm
->sync_info
)
1522 g_free(dfsm
->sync_info
);
1524 if (dfsm
->cpg_handle
)
1525 cpg_finalize(dfsm
->cpg_handle
);
1528 g_hash_table_destroy(dfsm
->members
);
1535 } service_dfsm_private_t
;
1538 service_dfsm_finalize(
1539 cfs_service_t
*service
,
1542 g_return_val_if_fail(service
!= NULL
, FALSE
);
1543 g_return_val_if_fail(context
!= NULL
, FALSE
);
1545 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1546 dfsm_t
*dfsm
= private->dfsm
;
1548 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1550 return dfsm_finalize(dfsm
);
1554 service_dfsm_initialize(
1555 cfs_service_t
*service
,
1558 g_return_val_if_fail(service
!= NULL
, -1);
1559 g_return_val_if_fail(context
!= NULL
, -1);
1561 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1562 dfsm_t
*dfsm
= private->dfsm
;
1564 g_return_val_if_fail(dfsm
!= NULL
, -1);
1566 /* serious internal error - don't try to recover */
1567 if (!dfsm_restartable(dfsm
))
1573 if ((result
= dfsm_initialize(dfsm
, &fd
)) != CS_OK
)
1576 result
= dfsm_join(dfsm
);
1577 if (result
!= CS_OK
) {
1578 /* we can't dispatch if not joined, so we need to finalize */
1579 dfsm_finalize(dfsm
);
1587 service_dfsm_dispatch(
1588 cfs_service_t
*service
,
1591 g_return_val_if_fail(service
!= NULL
, FALSE
);
1592 g_return_val_if_fail(context
!= NULL
, FALSE
);
1594 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1595 dfsm_t
*dfsm
= private->dfsm
;
1597 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1598 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, FALSE
);
1602 result
= dfsm_dispatch(dfsm
, CS_DISPATCH_ONE
);
1603 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1605 if (result
!= CS_OK
)
1608 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1609 if (mode
>= DFSM_ERROR_MODE_START
) {
1611 result
= dfsm_leave(dfsm
);
1612 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1614 if (result
!= CS_OK
)
1617 if (!dfsm
->we_are_member
)
1625 dfsm_finalize(dfsm
);
1627 cfs_service_set_restartable(service
, dfsm_restartable(dfsm
));
1633 cfs_service_t
*service
,
1636 g_return_if_fail(service
!= NULL
);
1637 g_return_if_fail(context
!= NULL
);
1639 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1640 dfsm_t
*dfsm
= private->dfsm
;
1642 g_return_if_fail(dfsm
!= NULL
);
1644 dfsm_verify_request(dfsm
);
1647 static cfs_service_callbacks_t cfs_dfsm_callbacks
= {
1648 .cfs_service_initialize_fn
= service_dfsm_initialize
,
1649 .cfs_service_finalize_fn
= service_dfsm_finalize
,
1650 .cfs_service_dispatch_fn
= service_dfsm_dispatch
,
1651 .cfs_service_timer_fn
= service_dfsm_timer
,
1655 service_dfsm_new(dfsm_t
*dfsm
)
1657 cfs_service_t
*service
;
1659 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
1661 service_dfsm_private_t
*private = g_new0(service_dfsm_private_t
, 1);
1665 private->dfsm
= dfsm
;
1667 service
= cfs_service_new(&cfs_dfsm_callbacks
, dfsm
->log_domain
, private);
1673 service_dfsm_destroy(cfs_service_t
*service
)
1675 g_return_if_fail(service
!= NULL
);
1677 service_dfsm_private_t
*private =
1678 (service_dfsm_private_t
*)cfs_service_get_context(service
);