2 Copyright (C) 2010 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
30 #endif /* HAVE_CONFIG_H */
32 #include <sys/types.h>
37 #include <corosync/corotypes.h>
38 #include <corosync/cpg.h>
41 #include "cfs-utils.h"
44 static cpg_callbacks_t cpg_callbacks
;
48 DFSM_MODE_START_SYNC
= 1,
52 /* values >= 128 indicates abnormal/error conditions */
53 DFSM_ERROR_MODE_START
= 128,
54 DFSM_MODE_LEAVE
= 253,
55 DFSM_MODE_VERSION_ERROR
= 254,
56 DFSM_MODE_ERROR
= 255,
60 DFSM_MESSAGE_NORMAL
= 0,
61 DFSM_MESSAGE_SYNC_START
= 1,
62 DFSM_MESSAGE_STATE
= 2,
63 DFSM_MESSAGE_UPDATE
= 3,
64 DFSM_MESSAGE_UPDATE_COMPLETE
= 4,
65 DFSM_MESSAGE_VERIFY_REQUEST
= 5,
66 DFSM_MESSAGE_VERIFY
= 6,
69 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
74 uint32_t protocol_version
;
77 } dfsm_message_header_t
;
80 uint32_t epoch
; // per process (not globally unique)
87 dfsm_message_header_t base
;
88 dfsm_sync_epoch_t epoch
;
89 } dfsm_message_state_header_t
;
92 dfsm_message_header_t base
;
94 } dfsm_message_normal_header_t
;
101 int msg_len
; // fixme: unsigned?
102 } dfsm_queued_message_t
;
106 cpg_callbacks_t
*cpg_callbacks
;
107 dfsm_callbacks_t
*dfsm_callbacks
;
108 cpg_handle_t cpg_handle
;
109 struct cpg_name cpg_group_name
;
114 guint32 protocol_version
;
119 /* mode is protected with mode_mutex */
123 GHashTable
*members
; /* contains dfsm_node_info_t pointers */
124 dfsm_sync_info_t
*sync_info
;
125 uint32_t local_epoch_counter
;
126 dfsm_sync_epoch_t sync_epoch
;
127 uint32_t lowest_nodeid
;
128 GSequence
*msg_queue
;
131 /* synchrounous message transmission, protected with sync_mutex */
136 uint64_t msgcount_rcvd
;
138 /* state verification */
140 dfsm_sync_epoch_t csum_epoch
;
142 uint64_t csum_counter
;
145 static gboolean
dfsm_deliver_queue(dfsm_t
*dfsm
);
146 static gboolean
dfsm_deliver_sync_queue(dfsm_t
*dfsm
);
149 dfsm_nodeid_is_local(
154 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
156 return (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
);
161 dfsm_send_sync_message_abort(dfsm_t
*dfsm
)
163 g_return_if_fail(dfsm
!= NULL
);
164 g_return_if_fail(dfsm
->sync_mutex
!= NULL
);
165 g_return_if_fail(dfsm
->sync_cond
!= NULL
);
167 g_mutex_lock (dfsm
->sync_mutex
);
168 dfsm
->msgcount_rcvd
= dfsm
->msgcount
;
169 g_cond_broadcast (dfsm
->sync_cond
);
170 g_mutex_unlock (dfsm
->sync_mutex
);
174 dfsm_record_local_result(
180 g_return_if_fail(dfsm
!= NULL
);
181 g_return_if_fail(dfsm
->sync_mutex
!= NULL
);
182 g_return_if_fail(dfsm
->sync_cond
!= NULL
);
183 g_return_if_fail(dfsm
->results
!= NULL
);
185 g_mutex_lock (dfsm
->sync_mutex
);
186 dfsm_result_t
*rp
= (dfsm_result_t
*)g_hash_table_lookup(dfsm
->results
, &msg_count
);
188 rp
->result
= msg_result
;
189 rp
->processed
= processed
;
191 dfsm
->msgcount_rcvd
= msg_count
;
192 g_cond_broadcast (dfsm
->sync_cond
);
193 g_mutex_unlock (dfsm
->sync_mutex
);
197 dfsm_send_message_full(
203 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
204 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
206 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
210 result
= cpg_mcast_joined(dfsm
->cpg_handle
, CPG_TYPE_AGREED
, iov
, len
);
211 if (retry
&& result
== CPG_ERR_TRY_AGAIN
) {
212 nanosleep(&tvreq
, NULL
);
214 if ((retries
% 10) == 0)
215 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retry %d", retries
);
221 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retried %d times", retries
);
223 if (result
!= CS_OK
&&
224 (!retry
|| result
!= CPG_ERR_TRY_AGAIN
))
225 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
231 dfsm_send_state_message_full(
237 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
238 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type
), CS_ERR_INVALID_PARAM
);
239 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
241 dfsm_message_state_header_t header
;
242 header
.base
.type
= type
;
243 header
.base
.subtype
= 0;
244 header
.base
.protocol_version
= dfsm
->protocol_version
;
245 header
.base
.time
= time(NULL
);
246 header
.base
.reserved
= 0;
248 header
.epoch
= dfsm
->sync_epoch
;
250 struct iovec real_iov
[len
+ 1];
252 real_iov
[0].iov_base
= (char *)&header
;
253 real_iov
[0].iov_len
= sizeof(header
);
255 for (int i
= 0; i
< len
; i
++)
256 real_iov
[i
+ 1] = iov
[i
];
258 return dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
267 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE
, iov
, len
);
271 dfsm_send_update_complete(dfsm_t
*dfsm
)
273 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE_COMPLETE
, NULL
, 0);
284 return dfsm_send_message_sync(dfsm
, msgtype
, iov
, len
, NULL
);
288 dfsm_send_message_sync(
295 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
296 g_return_val_if_fail(dfsm
->sync_mutex
!= NULL
, CS_ERR_INVALID_PARAM
);
297 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
299 g_mutex_lock (dfsm
->sync_mutex
);
300 /* note: hold lock until message is sent - to guarantee ordering */
301 uint64_t msgcount
= ++dfsm
->msgcount
;
303 rp
->msgcount
= msgcount
;
305 g_hash_table_replace(dfsm
->results
, &rp
->msgcount
, rp
);
308 dfsm_message_normal_header_t header
;
309 header
.base
.type
= DFSM_MESSAGE_NORMAL
;
310 header
.base
.subtype
= msgtype
;
311 header
.base
.protocol_version
= dfsm
->protocol_version
;
312 header
.base
.time
= time(NULL
);
313 header
.base
.reserved
= 0;
314 header
.count
= msgcount
;
316 struct iovec real_iov
[len
+ 1];
318 real_iov
[0].iov_base
= (char *)&header
;
319 real_iov
[0].iov_len
= sizeof(header
);
321 for (int i
= 0; i
< len
; i
++)
322 real_iov
[i
+ 1] = iov
[i
];
324 cpg_error_t result
= dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
326 g_mutex_unlock (dfsm
->sync_mutex
);
328 if (result
!= CS_OK
) {
329 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
332 g_mutex_lock (dfsm
->sync_mutex
);
333 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
334 g_mutex_unlock (dfsm
->sync_mutex
);
340 g_mutex_lock (dfsm
->sync_mutex
);
342 while (dfsm
->msgcount_rcvd
< msgcount
)
343 g_cond_wait (dfsm
->sync_cond
, dfsm
->sync_mutex
);
346 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
348 g_mutex_unlock (dfsm
->sync_mutex
);
350 return rp
->processed
? CS_OK
: CS_ERR_FAILED_OPERATION
;
357 dfsm_send_checksum(dfsm_t
*dfsm
)
359 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
362 struct iovec iov
[len
];
364 iov
[0].iov_base
= (char *)&dfsm
->csum_id
;
365 iov
[0].iov_len
= sizeof(dfsm
->csum_id
);
366 iov
[1].iov_base
= dfsm
->csum
;
367 iov
[1].iov_len
= sizeof(dfsm
->csum
);
369 gboolean res
= (dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY
, iov
, len
) == CS_OK
);
375 dfsm_free_queue_entry(gpointer data
)
377 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)data
;
383 dfsm_free_message_queue(dfsm_t
*dfsm
)
385 g_return_if_fail(dfsm
!= NULL
);
386 g_return_if_fail(dfsm
->msg_queue
!= NULL
);
388 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
389 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
390 while (iter
!= end
) {
391 GSequenceIter
*cur
= iter
;
392 iter
= g_sequence_iter_next(iter
);
393 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
395 dfsm_free_queue_entry(qm
);
396 g_sequence_remove(cur
);
401 dfsm_free_sync_queue(dfsm_t
*dfsm
)
403 g_return_if_fail(dfsm
!= NULL
);
405 GList
*iter
= dfsm
->sync_queue
;
407 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
408 iter
= g_list_next(iter
);
409 dfsm_free_queue_entry(qm
);
412 g_list_free(dfsm
->sync_queue
);
413 dfsm
->sync_queue
= NULL
;
417 message_queue_sort_fn(
422 return ((dfsm_queued_message_t
*)a
)->msg_count
-
423 ((dfsm_queued_message_t
*)b
)->msg_count
;
426 static dfsm_node_info_t
*
427 dfsm_node_info_lookup(
432 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
433 g_return_val_if_fail(dfsm
->members
!= NULL
, NULL
);
435 dfsm_node_info_t info
= { .nodeid
= nodeid
, .pid
= pid
};
437 return (dfsm_node_info_t
*)g_hash_table_lookup(dfsm
->members
, &info
);
440 static dfsm_queued_message_t
*
441 dfsm_queue_add_message(
449 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
450 g_return_val_if_fail(msg
!= NULL
, NULL
);
451 g_return_val_if_fail(msg_len
!= 0, NULL
);
453 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
);
455 cfs_dom_critical(dfsm
->log_domain
, "dfsm_node_info_lookup failed");
459 dfsm_queued_message_t
*qm
= g_new0(dfsm_queued_message_t
, 1);
460 g_return_val_if_fail(qm
!= NULL
, NULL
);
464 qm
->msg
= g_memdup (msg
, msg_len
);
465 qm
->msg_len
= msg_len
;
466 qm
->msg_count
= msg_count
;
468 if (dfsm
->mode
== DFSM_MODE_UPDATE
&& ni
->synced
) {
469 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
471 /* NOTE: we only need to sort the queue because we resend all
472 * queued messages sometimes.
474 g_sequence_insert_sorted(dfsm
->msg_queue
, qm
, message_queue_sort_fn
, NULL
);
481 dfsm_sync_info_hash(gconstpointer key
)
483 dfsm_node_info_t
*info
= (dfsm_node_info_t
*)key
;
485 return g_int_hash(&info
->nodeid
) + g_int_hash(&info
->pid
);
489 dfsm_sync_info_equal(
493 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
494 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
496 if (info1
->nodeid
== info2
->nodeid
&&
497 info1
->pid
== info2
->pid
)
504 dfsm_sync_info_compare(
508 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
509 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
511 if (info1
->nodeid
!= info2
->nodeid
)
512 return info1
->nodeid
- info2
->nodeid
;
514 return info1
->pid
- info2
->pid
;
520 dfsm_mode_t new_mode
)
522 g_return_if_fail(dfsm
!= NULL
);
524 cfs_debug("dfsm_set_mode - set mode to %d", new_mode
);
527 g_mutex_lock (dfsm
->mode_mutex
);
528 if (dfsm
->mode
!= new_mode
) {
529 if (new_mode
< DFSM_ERROR_MODE_START
||
530 (dfsm
->mode
< DFSM_ERROR_MODE_START
|| new_mode
>= dfsm
->mode
)) {
531 dfsm
->mode
= new_mode
;
535 g_mutex_unlock (dfsm
->mode_mutex
);
540 if (new_mode
== DFSM_MODE_START
) {
541 cfs_dom_message(dfsm
->log_domain
, "start cluster connection");
542 } else if (new_mode
== DFSM_MODE_START_SYNC
) {
543 cfs_dom_message(dfsm
->log_domain
, "starting data syncronisation");
544 } else if (new_mode
== DFSM_MODE_SYNCED
) {
545 cfs_dom_message(dfsm
->log_domain
, "all data is up to date");
546 if (dfsm
->dfsm_callbacks
->dfsm_synced_fn
)
547 dfsm
->dfsm_callbacks
->dfsm_synced_fn(dfsm
);
548 } else if (new_mode
== DFSM_MODE_UPDATE
) {
549 cfs_dom_message(dfsm
->log_domain
, "waiting for updates from leader");
550 } else if (new_mode
== DFSM_MODE_LEAVE
) {
551 cfs_dom_critical(dfsm
->log_domain
, "leaving CPG group");
552 } else if (new_mode
== DFSM_MODE_ERROR
) {
553 cfs_dom_critical(dfsm
->log_domain
, "serious internal error - stop cluster connection");
554 } else if (new_mode
== DFSM_MODE_VERSION_ERROR
) {
555 cfs_dom_critical(dfsm
->log_domain
, "detected newer protocol - please update this node");
560 dfsm_get_mode(dfsm_t
*dfsm
)
562 g_return_val_if_fail(dfsm
!= NULL
, DFSM_MODE_ERROR
);
564 g_mutex_lock (dfsm
->mode_mutex
);
565 dfsm_mode_t mode
= dfsm
->mode
;
566 g_mutex_unlock (dfsm
->mode_mutex
);
572 dfsm_restartable(dfsm_t
*dfsm
)
574 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
576 return !(mode
== DFSM_MODE_ERROR
||
577 mode
== DFSM_MODE_VERSION_ERROR
);
581 dfsm_set_errormode(dfsm_t
*dfsm
)
583 dfsm_set_mode(dfsm
, DFSM_MODE_ERROR
);
587 dfsm_release_sync_resources(
589 const struct cpg_address
*member_list
,
590 size_t member_list_entries
)
592 g_return_if_fail(dfsm
!= NULL
);
593 g_return_if_fail(dfsm
->members
!= NULL
);
594 g_return_if_fail(!member_list_entries
|| member_list
!= NULL
);
596 cfs_debug("enter dfsm_release_sync_resources");
598 if (dfsm
->sync_info
) {
600 if (dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
) {
601 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
602 dfsm
->sync_info
->data
= NULL
;
605 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
606 if (dfsm
->sync_info
->nodes
[i
].state
) {
607 g_free(dfsm
->sync_info
->nodes
[i
].state
);
608 dfsm
->sync_info
->nodes
[i
].state
= NULL
;
609 dfsm
->sync_info
->nodes
[i
].state_len
= 0;
616 g_hash_table_remove_all(dfsm
->members
);
619 g_free(dfsm
->sync_info
);
621 int size
= sizeof(dfsm_sync_info_t
) +
622 member_list_entries
*sizeof(dfsm_sync_info_t
);
623 dfsm_sync_info_t
*sync_info
= dfsm
->sync_info
= g_malloc0(size
);
624 sync_info
->node_count
= member_list_entries
;
626 for (int i
= 0; i
< member_list_entries
; i
++) {
627 sync_info
->nodes
[i
].nodeid
= member_list
[i
].nodeid
;
628 sync_info
->nodes
[i
].pid
= member_list
[i
].pid
;
631 qsort(sync_info
->nodes
, member_list_entries
, sizeof(dfsm_node_info_t
),
632 dfsm_sync_info_compare
);
634 for (int i
= 0; i
< member_list_entries
; i
++) {
635 dfsm_node_info_t
*info
= &sync_info
->nodes
[i
];
636 g_hash_table_insert(dfsm
->members
, info
, info
);
637 if (info
->nodeid
== dfsm
->nodeid
&& info
->pid
== dfsm
->pid
)
638 sync_info
->local
= info
;
644 dfsm_cpg_deliver_callback(
646 const struct cpg_name
*group_name
,
655 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
656 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
657 cfs_critical("cpg_context_get error: %d (%p)", result
, dfsm
);
658 return; /* we have no valid dfsm pointer, so we can just ignore this */
660 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
662 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
664 if (mode
>= DFSM_ERROR_MODE_START
) {
665 cfs_dom_debug(dfsm
->log_domain
, "error mode - ignoring message");
669 if (!dfsm
->sync_info
) {
670 cfs_dom_critical(dfsm
->log_domain
, "no dfsm_sync_info - internal error");
674 if (msg_len
< sizeof(dfsm_message_header_t
)) {
675 cfs_dom_critical(dfsm
->log_domain
, "received short message (%ld bytes)", msg_len
);
679 dfsm_message_header_t
*base_header
= (dfsm_message_header_t
*)msg
;
681 if (base_header
->protocol_version
> dfsm
->protocol_version
) {
682 cfs_dom_critical(dfsm
->log_domain
, "received message with protocol version %d",
683 base_header
->protocol_version
);
684 dfsm_set_mode(dfsm
, DFSM_MODE_VERSION_ERROR
);
686 } else if (base_header
->protocol_version
< dfsm
->protocol_version
) {
687 cfs_dom_message(dfsm
->log_domain
, "ignore message with wrong protocol version %d",
688 base_header
->protocol_version
);
692 if (base_header
->type
== DFSM_MESSAGE_NORMAL
) {
694 dfsm_message_normal_header_t
*header
= (dfsm_message_normal_header_t
*)msg
;
696 if (msg_len
< sizeof(dfsm_message_normal_header_t
)) {
697 cfs_dom_critical(dfsm
->log_domain
, "received short message (type = %d, subtype = %d, %ld bytes)",
698 base_header
->type
, base_header
->subtype
, msg_len
);
702 if (mode
!= DFSM_MODE_SYNCED
) {
703 cfs_dom_debug(dfsm
->log_domain
, "queue message %zu (subtype = %d, length = %ld)",
704 header
->count
, base_header
->subtype
, msg_len
);
706 if (!dfsm_queue_add_message(dfsm
, nodeid
, pid
, header
->count
, msg
, msg_len
))
711 int res
= dfsm
->dfsm_callbacks
->dfsm_deliver_fn(
712 dfsm
, dfsm
->data
, &msg_res
, nodeid
, pid
, base_header
->subtype
,
713 base_header
->time
, msg
+ sizeof(dfsm_message_normal_header_t
),
714 msg_len
- sizeof(dfsm_message_normal_header_t
));
716 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
)
717 dfsm_record_local_result(dfsm
, header
->count
, msg_res
, res
);
726 /* state related messages
727 * we needs right epoch - else we simply discard the message
730 dfsm_message_state_header_t
*header
= (dfsm_message_state_header_t
*)msg
;
732 if (msg_len
< sizeof(dfsm_message_state_header_t
)) {
733 cfs_dom_critical(dfsm
->log_domain
, "received short state message (type = %d, subtype = %d, %ld bytes)",
734 base_header
->type
, base_header
->subtype
, msg_len
);
738 if (base_header
->type
!= DFSM_MESSAGE_SYNC_START
&&
739 (memcmp(&header
->epoch
, &dfsm
->sync_epoch
, sizeof(dfsm_sync_epoch_t
)) != 0)) {
740 cfs_dom_debug(dfsm
->log_domain
, "ignore message (msg_type == %d) with "
741 "wrong epoch (epoch %d/%d/%08X)", base_header
->type
,
742 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
746 msg
+= sizeof(dfsm_message_state_header_t
);
747 msg_len
-= sizeof(dfsm_message_state_header_t
);
749 if (mode
== DFSM_MODE_SYNCED
) {
750 if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
752 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
753 dfsm
->sync_info
->nodes
[i
].synced
= 1;
755 if (!dfsm_deliver_queue(dfsm
))
760 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
) {
762 if (msg_len
!= sizeof(dfsm
->csum_counter
)) {
763 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len
, nodeid
, pid
);
767 uint64_t csum_id
= *((uint64_t *)msg
);
768 msg
+= 8; msg_len
-= 8;
770 cfs_dom_debug(dfsm
->log_domain
, "got verify request from node %d %016zX", nodeid
, csum_id
);
772 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
773 if (!dfsm
->dfsm_callbacks
->dfsm_checksum_fn(
774 dfsm
, dfsm
->data
, dfsm
->csum
, sizeof(dfsm
->csum
))) {
775 cfs_dom_critical(dfsm
->log_domain
, "unable to compute data checksum");
779 dfsm
->csum_epoch
= header
->epoch
;
780 dfsm
->csum_id
= csum_id
;
782 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
) {
783 if (!dfsm_send_checksum(dfsm
))
790 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY
) {
792 cfs_dom_debug(dfsm
->log_domain
, "received verify message");
794 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
796 if (msg_len
!= (sizeof(dfsm
->csum_id
) + sizeof(dfsm
->csum
))) {
797 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify message with wrong length (%ld bytes)", msg_len
);
801 uint64_t csum_id
= *((uint64_t *)msg
);
802 msg
+= 8; msg_len
-= 8;
804 if (dfsm
->csum_id
== csum_id
&&
805 (memcmp(&dfsm
->csum_epoch
, &header
->epoch
, sizeof(dfsm_sync_epoch_t
)) == 0)) {
806 if (memcmp(msg
, dfsm
->csum
, sizeof(dfsm
->csum
)) != 0) {
807 cfs_dom_critical(dfsm
->log_domain
, "wrong checksum %016zX != %016zX - restarting",
808 *(uint64_t *)msg
, *(uint64_t *)dfsm
->csum
);
811 cfs_dom_message(dfsm
->log_domain
, "data verification successful");
814 cfs_dom_message(dfsm
->log_domain
, "skip verification - no checksum saved");
821 /* ignore (we already got all required updates, or we are leader) */
822 cfs_dom_debug(dfsm
->log_domain
, "ignore state sync message %d",
827 } else if (mode
== DFSM_MODE_START_SYNC
) {
829 if (base_header
->type
== DFSM_MESSAGE_SYNC_START
) {
831 if (nodeid
!= dfsm
->lowest_nodeid
) {
832 cfs_dom_critical(dfsm
->log_domain
, "ignore sync request from wrong member %d/%d",
836 cfs_dom_message(dfsm
->log_domain
, "received sync request (epoch %d/%d/%08X)",
837 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
839 dfsm
->sync_epoch
= header
->epoch
;
841 dfsm_release_sync_resources(dfsm
, NULL
, 0);
843 unsigned int state_len
= 0;
844 gpointer state
= NULL
;
846 state
= dfsm
->dfsm_callbacks
->dfsm_get_state_fn(dfsm
, dfsm
->data
, &state_len
);
848 if (!(state
&& state_len
)) {
849 cfs_dom_critical(dfsm
->log_domain
, "dfsm_get_state_fn failed");
854 iov
[0].iov_base
= state
;
855 iov
[0].iov_len
= state_len
;
857 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_STATE
, iov
, 1);
867 } else if (base_header
->type
== DFSM_MESSAGE_STATE
) {
869 dfsm_node_info_t
*ni
;
871 if (!(ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
))) {
872 cfs_dom_critical(dfsm
->log_domain
, "received state for non-member %d/%d", nodeid
, pid
);
877 cfs_dom_critical(dfsm
->log_domain
, "received duplicate state for member %d/%d", nodeid
, pid
);
881 ni
->state
= g_memdup(msg
, msg_len
);
882 ni
->state_len
= msg_len
;
884 int received_all
= 1;
885 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
886 if (!dfsm
->sync_info
->nodes
[i
].state
) {
893 cfs_dom_message(dfsm
->log_domain
, "received all states");
895 int res
= dfsm
->dfsm_callbacks
->dfsm_process_state_update_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
899 if (dfsm
->sync_info
->local
->synced
) {
900 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
901 dfsm_release_sync_resources(dfsm
, NULL
, 0);
903 if (!dfsm_deliver_queue(dfsm
))
907 dfsm_set_mode(dfsm
, DFSM_MODE_UPDATE
);
909 if (!dfsm_deliver_queue(dfsm
))
918 } else if (mode
== DFSM_MODE_UPDATE
) {
920 if (base_header
->type
== DFSM_MESSAGE_UPDATE
) {
922 int res
= dfsm
->dfsm_callbacks
->dfsm_process_update_fn(
923 dfsm
, dfsm
->data
, dfsm
->sync_info
, nodeid
, pid
, msg
, msg_len
);
930 } else if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
933 int res
= dfsm
->dfsm_callbacks
->dfsm_commit_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
938 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
939 dfsm
->sync_info
->nodes
[i
].synced
= 1;
941 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
943 if (!dfsm_deliver_sync_queue(dfsm
))
946 if (!dfsm_deliver_queue(dfsm
))
949 dfsm_release_sync_resources(dfsm
, NULL
, 0);
955 cfs_dom_critical(dfsm
->log_domain
, "internal error - unknown mode %d", mode
);
959 if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
||
960 base_header
->type
== DFSM_MESSAGE_VERIFY
) {
962 cfs_dom_debug(dfsm
->log_domain
, "ignore verify message %d while not synced", base_header
->type
);
965 cfs_dom_critical(dfsm
->log_domain
, "received unknown state message type (type = %d, %ld bytes)",
966 base_header
->type
, msg_len
);
971 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
972 dfsm_release_sync_resources(dfsm
, NULL
, 0);
977 dfsm_resend_queue(dfsm_t
*dfsm
)
979 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
980 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
982 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
983 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
986 while (iter
!= end
) {
987 GSequenceIter
*cur
= iter
;
988 iter
= g_sequence_iter_next(iter
);
990 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
993 if (qm
->nodeid
== dfsm
->nodeid
&& qm
->pid
== dfsm
->pid
) {
996 iov
[0].iov_base
= qm
->msg
;
997 iov
[0].iov_len
= qm
->msg_len
;
999 if ((result
= dfsm_send_message_full(dfsm
, iov
, 1, 1)) != CS_OK
) {
1006 dfsm_free_message_queue(dfsm
);
1012 dfsm_deliver_sync_queue(dfsm_t
*dfsm
)
1014 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1016 if (!dfsm
->sync_queue
)
1019 gboolean res
= TRUE
;
1022 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
,
1023 g_list_length(dfsm
->sync_queue
));
1025 GList
*iter
= dfsm
->sync_queue
;
1027 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
1028 iter
= g_list_next(iter
);
1030 if (res
&& dfsm
->mode
== DFSM_MODE_SYNCED
) {
1031 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1032 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1037 dfsm_free_queue_entry(qm
);
1039 g_list_free(dfsm
->sync_queue
);
1040 dfsm
->sync_queue
= NULL
;
1046 dfsm_deliver_queue(dfsm_t
*dfsm
)
1048 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1049 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
1051 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1055 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
1056 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
1057 gboolean res
= TRUE
;
1060 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
, qlen
);
1062 while (iter
!= end
) {
1063 GSequenceIter
*cur
= iter
;
1064 iter
= g_sequence_iter_next(iter
);
1066 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
1067 g_sequence_get(cur
);
1069 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, qm
->nodeid
, qm
->pid
);
1071 cfs_dom_message(dfsm
->log_domain
, "remove mesage from non-member %d/%d",
1072 qm
->nodeid
, qm
->pid
);
1073 dfsm_free_queue_entry(qm
);
1074 g_sequence_remove(cur
);
1078 if (dfsm
->mode
== DFSM_MODE_SYNCED
) {
1080 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1081 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1082 dfsm_free_queue_entry(qm
);
1083 g_sequence_remove(cur
);
1085 } else if (dfsm
->mode
== DFSM_MODE_UPDATE
) {
1087 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
1088 g_sequence_remove(cur
);
1100 dfsm_cpg_confchg_callback(
1101 cpg_handle_t handle
,
1102 const struct cpg_name
*group_name
,
1103 const struct cpg_address
*member_list
,
1104 size_t member_list_entries
,
1105 const struct cpg_address
*left_list
,
1106 size_t left_list_entries
,
1107 const struct cpg_address
*joined_list
,
1108 size_t joined_list_entries
)
1112 dfsm_t
*dfsm
= NULL
;
1113 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
1114 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
1115 cfs_critical("cpg_context_get error: %d (%p)", result
, dfsm
);
1116 return; /* we have no valid dfsm pointer, so we can just ignore this */
1119 dfsm
->we_are_member
= 0;
1121 /* create new epoch */
1122 dfsm
->local_epoch_counter
++;
1123 dfsm
->sync_epoch
.epoch
= dfsm
->local_epoch_counter
;
1124 dfsm
->sync_epoch
.nodeid
= dfsm
->nodeid
;
1125 dfsm
->sync_epoch
.pid
= dfsm
->pid
;
1126 dfsm
->sync_epoch
.time
= time(NULL
);
1128 /* invalidate saved checksum */
1129 dfsm
->csum_id
= dfsm
->csum_counter
;
1130 memset(&dfsm
->csum_epoch
, 0, sizeof(dfsm
->csum_epoch
));
1132 dfsm_free_sync_queue(dfsm
);
1134 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1136 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
1138 if (mode
>= DFSM_ERROR_MODE_START
) {
1139 cfs_dom_debug(dfsm
->log_domain
, "already left group - ignore message");
1143 int lowest_nodeid
= 0;
1144 GString
*str
= g_string_new("members: ");
1145 for (int i
= 0; i
< member_list_entries
; i
++) {
1147 g_string_append_printf(str
, i
? ", %d/%d" : "%d/%d",
1148 member_list
[i
].nodeid
, member_list
[i
].pid
);
1150 if (lowest_nodeid
== 0 || lowest_nodeid
> member_list
[i
].nodeid
)
1151 lowest_nodeid
= member_list
[i
].nodeid
;
1153 if (member_list
[i
].nodeid
== dfsm
->nodeid
&&
1154 member_list
[i
].pid
== dfsm
->pid
)
1155 dfsm
->we_are_member
= 1;
1159 if ((dfsm
->we_are_member
|| mode
!= DFSM_MODE_START
))
1160 cfs_dom_message(dfsm
->log_domain
, str
->str
);
1162 g_string_free(str
, 1);
1164 dfsm
->lowest_nodeid
= lowest_nodeid
;
1166 /* NOTE: one node can be in left and joined list at the same time,
1167 so it is better to query member list. Also JOIN/LEAVE list are
1168 different on different nodes!
1171 dfsm_release_sync_resources(dfsm
, member_list
, member_list_entries
);
1173 if (!dfsm
->we_are_member
) {
1174 if (mode
== DFSM_MODE_START
) {
1175 cfs_dom_debug(dfsm
->log_domain
, "ignore leave message");
1178 cfs_dom_message(dfsm
->log_domain
, "we (%d/%d) left the process group",
1179 dfsm
->nodeid
, dfsm
->pid
);
1183 if (member_list_entries
> 1) {
1185 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1186 if (joined_list_entries
&& qlen
) {
1187 /* we need to make sure that all members have the same queue. */
1188 cfs_dom_message(dfsm
->log_domain
, "queue not emtpy - resening %d messages", qlen
);
1189 if (!dfsm_resend_queue(dfsm
)) {
1190 cfs_dom_critical(dfsm
->log_domain
, "dfsm_resend_queue failed");
1195 dfsm_set_mode(dfsm
, DFSM_MODE_START_SYNC
);
1196 if (lowest_nodeid
== dfsm
->nodeid
) {
1197 if (!dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_SYNC_START
, NULL
, 0)) {
1198 cfs_dom_critical(dfsm
->log_domain
, "failed to send SYNC_START message");
1203 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
1204 dfsm
->sync_info
->local
->synced
= 1;
1205 if (!dfsm_deliver_queue(dfsm
))
1209 if (dfsm
->dfsm_callbacks
->dfsm_confchg_fn
)
1210 dfsm
->dfsm_callbacks
->dfsm_confchg_fn(dfsm
, dfsm
->data
, member_list
, member_list_entries
);
1214 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
1218 static cpg_callbacks_t cpg_callbacks
= {
1219 .cpg_deliver_fn
= dfsm_cpg_deliver_callback
,
1220 .cpg_confchg_fn
= dfsm_cpg_confchg_callback
,
1226 const char *group_name
,
1227 const char *log_domain
,
1228 guint32 protocol_version
,
1229 dfsm_callbacks_t
*callbacks
)
1231 g_return_val_if_fail(sizeof(dfsm_message_header_t
) == 16, NULL
);
1232 g_return_val_if_fail(sizeof(dfsm_message_state_header_t
) == 32, NULL
);
1233 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t
) == 24, NULL
);
1235 g_return_val_if_fail(callbacks
!= NULL
, NULL
);
1236 g_return_val_if_fail(callbacks
->dfsm_deliver_fn
!= NULL
, NULL
);
1238 g_return_val_if_fail(callbacks
->dfsm_get_state_fn
!= NULL
, NULL
);
1239 g_return_val_if_fail(callbacks
->dfsm_process_state_update_fn
!= NULL
, NULL
);
1240 g_return_val_if_fail(callbacks
->dfsm_process_update_fn
!= NULL
, NULL
);
1241 g_return_val_if_fail(callbacks
->dfsm_commit_fn
!= NULL
, NULL
);
1245 if ((dfsm
= g_new0(dfsm_t
, 1)) == NULL
)
1248 if (!(dfsm
->sync_mutex
= g_mutex_new()))
1251 if (!(dfsm
->sync_cond
= g_cond_new()))
1254 if (!(dfsm
->results
= g_hash_table_new(g_int64_hash
, g_int64_equal
)))
1257 if (!(dfsm
->msg_queue
= g_sequence_new(NULL
)))
1260 dfsm
->log_domain
= g_strdup(log_domain
);
1262 dfsm
->mode
= DFSM_MODE_START
;
1263 dfsm
->protocol_version
= protocol_version
;
1264 strcpy (dfsm
->cpg_group_name
.value
, group_name
);
1265 dfsm
->cpg_group_name
.length
= strlen (group_name
) + 1;
1267 dfsm
->cpg_callbacks
= &cpg_callbacks
;
1268 dfsm
->dfsm_callbacks
= callbacks
;
1270 dfsm
->members
= g_hash_table_new(dfsm_sync_info_hash
, dfsm_sync_info_equal
);
1274 if ((dfsm
->mode_mutex
= g_mutex_new()) == NULL
)
1285 dfsm_lowest_nodeid(dfsm_t
*dfsm
)
1287 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1289 if (dfsm
->lowest_nodeid
&& (dfsm
->lowest_nodeid
== dfsm
->nodeid
))
1296 dfsm_verify_request(dfsm_t
*dfsm
)
1298 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1300 /* only do when we have lowest nodeid */
1301 if (!dfsm
->lowest_nodeid
|| (dfsm
->lowest_nodeid
!= dfsm
->nodeid
))
1304 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1305 if (mode
!= DFSM_MODE_SYNCED
)
1309 struct iovec iov
[len
];
1311 if (dfsm
->csum_counter
!= dfsm
->csum_id
) {
1312 g_message("delay verify request %016zX", dfsm
->csum_counter
+ 1);
1316 dfsm
->csum_counter
++;
1317 iov
[0].iov_base
= (char *)&dfsm
->csum_counter
;
1318 iov
[0].iov_len
= sizeof(dfsm
->csum_counter
);
1320 cfs_debug("send verify request %016zX", dfsm
->csum_counter
);
1323 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY_REQUEST
, iov
, len
);
1325 if (result
!= CS_OK
)
1326 cfs_dom_critical(dfsm
->log_domain
, "failed to send VERIFY_REQUEST message");
1335 cs_dispatch_flags_t dispatch_types
)
1337 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1338 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_INVALID_PARAM
);
1342 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1345 result
= cpg_dispatch(dfsm
->cpg_handle
, dispatch_types
);
1346 if (result
== CPG_ERR_TRY_AGAIN
) {
1347 nanosleep(&tvreq
, NULL
);
1349 if ((retries
% 10) == 0)
1350 cfs_dom_message(dfsm
->log_domain
, "cpg_dispatch retry %d", retries
);
1354 if (!(result
== CS_OK
|| result
== CS_ERR_TRY_AGAIN
)) {
1355 cfs_dom_critical(dfsm
->log_domain
, "cpg_dispatch failed: %d", result
);
1363 dfsm_initialize(dfsm_t
*dfsm
, int *fd
)
1365 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1366 g_return_val_if_fail(fd
!= NULL
, CS_ERR_INVALID_PARAM
);
1368 /* remove old messages */
1369 dfsm_free_message_queue(dfsm
);
1370 dfsm_send_sync_message_abort(dfsm
);
1372 dfsm
->joined
= FALSE
;
1373 dfsm
->we_are_member
= 0;
1375 dfsm_set_mode(dfsm
, DFSM_MODE_START
);
1379 if (dfsm
->cpg_handle
== 0) {
1380 if ((result
= cpg_initialize(&dfsm
->cpg_handle
, dfsm
->cpg_callbacks
)) != CS_OK
) {
1381 cfs_dom_critical(dfsm
->log_domain
, "cpg_initialize failed: %d", result
);
1382 dfsm
->cpg_handle
= 0;
1386 if ((result
= cpg_local_get(dfsm
->cpg_handle
, &dfsm
->nodeid
)) != CS_OK
) {
1387 cfs_dom_critical(dfsm
->log_domain
, "cpg_local_get failed: %d", result
);
1391 dfsm
->pid
= getpid();
1393 result
= cpg_context_set(dfsm
->cpg_handle
, dfsm
);
1394 if (result
!= CS_OK
) {
1395 cfs_dom_critical(dfsm
->log_domain
, "cpg_context_set failed: %d", result
);
1400 result
= cpg_fd_get(dfsm
->cpg_handle
, fd
);
1401 if (result
!= CS_OK
) {
1402 cfs_dom_critical(dfsm
->log_domain
, "cpg_fd_get failed: %d", result
);
1409 if (dfsm
->cpg_handle
)
1410 cpg_finalize(dfsm
->cpg_handle
);
1411 dfsm
->cpg_handle
= 0;
1416 dfsm_join(dfsm_t
*dfsm
)
1418 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1419 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_LIBRARY
);
1420 g_return_val_if_fail(dfsm
->joined
== 0, CS_ERR_EXIST
);
1424 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1427 result
= cpg_join(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1428 if (result
== CPG_ERR_TRY_AGAIN
) {
1429 nanosleep(&tvreq
, NULL
);
1431 if ((retries
% 10) == 0)
1432 cfs_dom_message(dfsm
->log_domain
, "cpg_join retry %d", retries
);
1436 if (result
!= CS_OK
) {
1437 cfs_dom_critical(dfsm
->log_domain
, "cpg_join failed: %d", result
);
1441 dfsm
->joined
= TRUE
;
1446 dfsm_leave (dfsm_t
*dfsm
)
1448 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1449 g_return_val_if_fail(dfsm
->joined
, CS_ERR_NOT_EXIST
);
1453 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1456 result
= cpg_leave(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1457 if (result
== CPG_ERR_TRY_AGAIN
) {
1458 nanosleep(&tvreq
, NULL
);
1460 if ((retries
% 10) == 0)
1461 cfs_dom_message(dfsm
->log_domain
, "cpg_leave retry %d", retries
);
1465 if (result
!= CS_OK
) {
1466 cfs_dom_critical(dfsm
->log_domain
, "cpg_leave failed: %d", result
);
1470 dfsm
->joined
= FALSE
;
1476 dfsm_finalize(dfsm_t
*dfsm
)
1478 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1480 dfsm_send_sync_message_abort(dfsm
);
1485 if (dfsm
->cpg_handle
) {
1486 cpg_finalize(dfsm
->cpg_handle
);
1487 dfsm
->cpg_handle
= 0;
1488 dfsm
->joined
= FALSE
;
1489 dfsm
->we_are_member
= 0;
1496 dfsm_destroy(dfsm_t
*dfsm
)
1498 g_return_if_fail(dfsm
!= NULL
);
1500 dfsm_finalize(dfsm
);
1502 if (dfsm
->sync_info
&& dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
)
1503 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
1505 dfsm_free_sync_queue(dfsm
);
1507 if (dfsm
->mode_mutex
)
1508 g_mutex_free (dfsm
->mode_mutex
);
1510 if (dfsm
->sync_mutex
)
1511 g_mutex_free (dfsm
->sync_mutex
);
1513 if (dfsm
->sync_cond
)
1514 g_cond_free (dfsm
->sync_cond
);
1517 g_hash_table_destroy(dfsm
->results
);
1519 if (dfsm
->msg_queue
) {
1520 dfsm_free_message_queue(dfsm
);
1521 g_sequence_free(dfsm
->msg_queue
);
1524 if (dfsm
->sync_info
)
1525 g_free(dfsm
->sync_info
);
1527 if (dfsm
->cpg_handle
)
1528 cpg_finalize(dfsm
->cpg_handle
);
1531 g_hash_table_destroy(dfsm
->members
);
1533 if (dfsm
->log_domain
)
1534 g_free(dfsm
->log_domain
);
1541 } service_dfsm_private_t
;
1544 service_dfsm_finalize(
1545 cfs_service_t
*service
,
1548 g_return_val_if_fail(service
!= NULL
, FALSE
);
1549 g_return_val_if_fail(context
!= NULL
, FALSE
);
1551 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1552 dfsm_t
*dfsm
= private->dfsm
;
1554 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1556 return dfsm_finalize(dfsm
);
1560 service_dfsm_initialize(
1561 cfs_service_t
*service
,
1564 g_return_val_if_fail(service
!= NULL
, -1);
1565 g_return_val_if_fail(context
!= NULL
, -1);
1567 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1568 dfsm_t
*dfsm
= private->dfsm
;
1570 g_return_val_if_fail(dfsm
!= NULL
, -1);
1572 /* serious internal error - don't try to recover */
1573 if (!dfsm_restartable(dfsm
))
1579 if ((result
= dfsm_initialize(dfsm
, &fd
)) != CS_OK
)
1582 result
= dfsm_join(dfsm
);
1583 if (result
!= CS_OK
) {
1584 /* we can't dispatch if not joined, so we need to finalize */
1585 dfsm_finalize(dfsm
);
1593 service_dfsm_dispatch(
1594 cfs_service_t
*service
,
1597 g_return_val_if_fail(service
!= NULL
, FALSE
);
1598 g_return_val_if_fail(context
!= NULL
, FALSE
);
1600 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1601 dfsm_t
*dfsm
= private->dfsm
;
1603 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1604 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, FALSE
);
1608 result
= dfsm_dispatch(dfsm
, CPG_DISPATCH_ONE
);
1609 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1611 if (result
!= CS_OK
)
1614 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1615 if (mode
>= DFSM_ERROR_MODE_START
) {
1617 result
= dfsm_leave(dfsm
);
1618 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1620 if (result
!= CS_OK
)
1623 if (!dfsm
->we_are_member
)
1631 cfs_service_set_restartable(service
, dfsm_restartable(dfsm
));
1635 dfsm_finalize(dfsm
);
1641 cfs_service_t
*service
,
1644 g_return_if_fail(service
!= NULL
);
1645 g_return_if_fail(context
!= NULL
);
1647 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1648 dfsm_t
*dfsm
= private->dfsm
;
1650 g_return_if_fail(dfsm
!= NULL
);
1652 dfsm_verify_request(dfsm
);
1655 static cfs_service_callbacks_t cfs_dfsm_callbacks
= {
1656 .cfs_service_initialize_fn
= service_dfsm_initialize
,
1657 .cfs_service_finalize_fn
= service_dfsm_finalize
,
1658 .cfs_service_dispatch_fn
= service_dfsm_dispatch
,
1659 .cfs_service_timer_fn
= service_dfsm_timer
,
1663 service_dfsm_new(dfsm_t
*dfsm
)
1665 cfs_service_t
*service
;
1667 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
1669 service_dfsm_private_t
*private = g_new0(service_dfsm_private_t
, 1);
1673 private->dfsm
= dfsm
;
1675 service
= cfs_service_new(&cfs_dfsm_callbacks
, dfsm
->log_domain
, private);
1681 service_dfsm_destroy(cfs_service_t
*service
)
1683 g_return_if_fail(service
!= NULL
);
1685 service_dfsm_private_t
*private =
1686 (service_dfsm_private_t
*)cfs_service_get_context(service
);