2 Copyright (C) 2010 Proxmox Server Solutions GmbH
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 Author: Dietmar Maurer <dietmar@proxmox.com>
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
30 #endif /* HAVE_CONFIG_H */
32 #include <sys/types.h>
37 #include <corosync/corotypes.h>
38 #include <corosync/cpg.h>
41 #include "cfs-utils.h"
44 static cpg_callbacks_t cpg_callbacks
;
48 DFSM_MODE_START_SYNC
= 1,
52 /* values >= 128 indicates abnormal/error conditions */
53 DFSM_ERROR_MODE_START
= 128,
54 DFSM_MODE_LEAVE
= 253,
55 DFSM_MODE_VERSION_ERROR
= 254,
56 DFSM_MODE_ERROR
= 255,
60 DFSM_MESSAGE_NORMAL
= 0,
61 DFSM_MESSAGE_SYNC_START
= 1,
62 DFSM_MESSAGE_STATE
= 2,
63 DFSM_MESSAGE_UPDATE
= 3,
64 DFSM_MESSAGE_UPDATE_COMPLETE
= 4,
65 DFSM_MESSAGE_VERIFY_REQUEST
= 5,
66 DFSM_MESSAGE_VERIFY
= 6,
69 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
74 uint32_t protocol_version
;
77 } dfsm_message_header_t
;
80 uint32_t epoch
; // per process (not globally unique)
87 dfsm_message_header_t base
;
88 dfsm_sync_epoch_t epoch
;
89 } dfsm_message_state_header_t
;
92 dfsm_message_header_t base
;
94 } dfsm_message_normal_header_t
;
101 int msg_len
; // fixme: unsigned?
102 } dfsm_queued_message_t
;
105 const char *log_domain
;
106 cpg_callbacks_t
*cpg_callbacks
;
107 dfsm_callbacks_t
*dfsm_callbacks
;
108 cpg_handle_t cpg_handle
;
109 struct cpg_name cpg_group_name
;
114 guint32 protocol_version
;
119 /* mode is protected with mode_mutex */
123 GHashTable
*members
; /* contains dfsm_node_info_t pointers */
124 dfsm_sync_info_t
*sync_info
;
125 uint32_t local_epoch_counter
;
126 dfsm_sync_epoch_t sync_epoch
;
127 uint32_t lowest_nodeid
;
128 GSequence
*msg_queue
;
131 /* synchrounous message transmission, protected with sync_mutex */
136 uint64_t msgcount_rcvd
;
138 /* state verification */
140 dfsm_sync_epoch_t csum_epoch
;
142 uint64_t csum_counter
;
145 static gboolean
dfsm_deliver_queue(dfsm_t
*dfsm
);
146 static gboolean
dfsm_deliver_sync_queue(dfsm_t
*dfsm
);
149 dfsm_nodeid_is_local(
154 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
156 return (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
);
161 dfsm_send_sync_message_abort(dfsm_t
*dfsm
)
163 g_return_if_fail(dfsm
!= NULL
);
165 g_mutex_lock (&dfsm
->sync_mutex
);
166 dfsm
->msgcount_rcvd
= dfsm
->msgcount
;
167 g_cond_broadcast (&dfsm
->sync_cond
);
168 g_mutex_unlock (&dfsm
->sync_mutex
);
172 dfsm_record_local_result(
178 g_return_if_fail(dfsm
!= NULL
);
179 g_return_if_fail(dfsm
->results
!= NULL
);
181 g_mutex_lock (&dfsm
->sync_mutex
);
182 dfsm_result_t
*rp
= (dfsm_result_t
*)g_hash_table_lookup(dfsm
->results
, &msg_count
);
184 rp
->result
= msg_result
;
185 rp
->processed
= processed
;
187 dfsm
->msgcount_rcvd
= msg_count
;
188 g_cond_broadcast (&dfsm
->sync_cond
);
189 g_mutex_unlock (&dfsm
->sync_mutex
);
193 dfsm_send_message_full(
199 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
200 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
202 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
206 result
= cpg_mcast_joined(dfsm
->cpg_handle
, CPG_TYPE_AGREED
, iov
, len
);
207 if (retry
&& result
== CS_ERR_TRY_AGAIN
) {
208 nanosleep(&tvreq
, NULL
);
210 if ((retries
% 10) == 0)
211 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retry %d", retries
);
217 cfs_dom_message(dfsm
->log_domain
, "cpg_send_message retried %d times", retries
);
219 if (result
!= CS_OK
&&
220 (!retry
|| result
!= CS_ERR_TRY_AGAIN
))
221 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
227 dfsm_send_state_message_full(
233 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
234 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type
), CS_ERR_INVALID_PARAM
);
235 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
237 dfsm_message_state_header_t header
;
238 header
.base
.type
= type
;
239 header
.base
.subtype
= 0;
240 header
.base
.protocol_version
= dfsm
->protocol_version
;
241 header
.base
.time
= time(NULL
);
242 header
.base
.reserved
= 0;
244 header
.epoch
= dfsm
->sync_epoch
;
246 struct iovec real_iov
[len
+ 1];
248 real_iov
[0].iov_base
= (char *)&header
;
249 real_iov
[0].iov_len
= sizeof(header
);
251 for (int i
= 0; i
< len
; i
++)
252 real_iov
[i
+ 1] = iov
[i
];
254 return dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
263 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE
, iov
, len
);
267 dfsm_send_update_complete(dfsm_t
*dfsm
)
269 return dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_UPDATE_COMPLETE
, NULL
, 0);
280 return dfsm_send_message_sync(dfsm
, msgtype
, iov
, len
, NULL
);
284 dfsm_send_message_sync(
291 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
292 g_return_val_if_fail(!len
|| iov
!= NULL
, CS_ERR_INVALID_PARAM
);
294 g_mutex_lock (&dfsm
->sync_mutex
);
295 /* note: hold lock until message is sent - to guarantee ordering */
296 uint64_t msgcount
= ++dfsm
->msgcount
;
298 rp
->msgcount
= msgcount
;
300 g_hash_table_replace(dfsm
->results
, &rp
->msgcount
, rp
);
303 dfsm_message_normal_header_t header
;
304 header
.base
.type
= DFSM_MESSAGE_NORMAL
;
305 header
.base
.subtype
= msgtype
;
306 header
.base
.protocol_version
= dfsm
->protocol_version
;
307 header
.base
.time
= time(NULL
);
308 header
.base
.reserved
= 0;
309 header
.count
= msgcount
;
311 struct iovec real_iov
[len
+ 1];
313 real_iov
[0].iov_base
= (char *)&header
;
314 real_iov
[0].iov_len
= sizeof(header
);
316 for (int i
= 0; i
< len
; i
++)
317 real_iov
[i
+ 1] = iov
[i
];
319 cs_error_t result
= dfsm_send_message_full(dfsm
, real_iov
, len
+ 1, 1);
321 g_mutex_unlock (&dfsm
->sync_mutex
);
323 if (result
!= CS_OK
) {
324 cfs_dom_critical(dfsm
->log_domain
, "cpg_send_message failed: %d", result
);
327 g_mutex_lock (&dfsm
->sync_mutex
);
328 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
329 g_mutex_unlock (&dfsm
->sync_mutex
);
335 g_mutex_lock (&dfsm
->sync_mutex
);
337 while (dfsm
->msgcount_rcvd
< msgcount
)
338 g_cond_wait (&dfsm
->sync_cond
, &dfsm
->sync_mutex
);
341 g_hash_table_remove(dfsm
->results
, &rp
->msgcount
);
343 g_mutex_unlock (&dfsm
->sync_mutex
);
345 return rp
->processed
? CS_OK
: CS_ERR_FAILED_OPERATION
;
352 dfsm_send_checksum(dfsm_t
*dfsm
)
354 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
357 struct iovec iov
[len
];
359 iov
[0].iov_base
= (char *)&dfsm
->csum_id
;
360 iov
[0].iov_len
= sizeof(dfsm
->csum_id
);
361 iov
[1].iov_base
= dfsm
->csum
;
362 iov
[1].iov_len
= sizeof(dfsm
->csum
);
364 gboolean res
= (dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY
, iov
, len
) == CS_OK
);
370 dfsm_free_queue_entry(gpointer data
)
372 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)data
;
378 dfsm_free_message_queue(dfsm_t
*dfsm
)
380 g_return_if_fail(dfsm
!= NULL
);
381 g_return_if_fail(dfsm
->msg_queue
!= NULL
);
383 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
384 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
385 while (iter
!= end
) {
386 GSequenceIter
*cur
= iter
;
387 iter
= g_sequence_iter_next(iter
);
388 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
390 dfsm_free_queue_entry(qm
);
391 g_sequence_remove(cur
);
396 dfsm_free_sync_queue(dfsm_t
*dfsm
)
398 g_return_if_fail(dfsm
!= NULL
);
400 GList
*iter
= dfsm
->sync_queue
;
402 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
403 iter
= g_list_next(iter
);
404 dfsm_free_queue_entry(qm
);
407 g_list_free(dfsm
->sync_queue
);
408 dfsm
->sync_queue
= NULL
;
412 message_queue_sort_fn(
417 return ((dfsm_queued_message_t
*)a
)->msg_count
-
418 ((dfsm_queued_message_t
*)b
)->msg_count
;
421 static dfsm_node_info_t
*
422 dfsm_node_info_lookup(
427 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
428 g_return_val_if_fail(dfsm
->members
!= NULL
, NULL
);
430 dfsm_node_info_t info
= { .nodeid
= nodeid
, .pid
= pid
};
432 return (dfsm_node_info_t
*)g_hash_table_lookup(dfsm
->members
, &info
);
435 static dfsm_queued_message_t
*
436 dfsm_queue_add_message(
444 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
445 g_return_val_if_fail(msg
!= NULL
, NULL
);
446 g_return_val_if_fail(msg_len
!= 0, NULL
);
448 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
);
450 cfs_dom_critical(dfsm
->log_domain
, "dfsm_node_info_lookup failed");
454 dfsm_queued_message_t
*qm
= g_new0(dfsm_queued_message_t
, 1);
455 g_return_val_if_fail(qm
!= NULL
, NULL
);
459 qm
->msg
= g_memdup (msg
, msg_len
);
460 qm
->msg_len
= msg_len
;
461 qm
->msg_count
= msg_count
;
463 if (dfsm
->mode
== DFSM_MODE_UPDATE
&& ni
->synced
) {
464 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
466 /* NOTE: we only need to sort the queue because we resend all
467 * queued messages sometimes.
469 g_sequence_insert_sorted(dfsm
->msg_queue
, qm
, message_queue_sort_fn
, NULL
);
476 dfsm_sync_info_hash(gconstpointer key
)
478 dfsm_node_info_t
*info
= (dfsm_node_info_t
*)key
;
480 return g_int_hash(&info
->nodeid
) + g_int_hash(&info
->pid
);
484 dfsm_sync_info_equal(
488 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
489 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
491 if (info1
->nodeid
== info2
->nodeid
&&
492 info1
->pid
== info2
->pid
)
499 dfsm_sync_info_compare(
503 dfsm_node_info_t
*info1
= (dfsm_node_info_t
*)v1
;
504 dfsm_node_info_t
*info2
= (dfsm_node_info_t
*)v2
;
506 if (info1
->nodeid
!= info2
->nodeid
)
507 return info1
->nodeid
- info2
->nodeid
;
509 return info1
->pid
- info2
->pid
;
515 dfsm_mode_t new_mode
)
517 g_return_if_fail(dfsm
!= NULL
);
519 cfs_debug("dfsm_set_mode - set mode to %d", new_mode
);
522 g_mutex_lock (&dfsm
->mode_mutex
);
523 if (dfsm
->mode
!= new_mode
) {
524 if (new_mode
< DFSM_ERROR_MODE_START
||
525 (dfsm
->mode
< DFSM_ERROR_MODE_START
|| new_mode
>= dfsm
->mode
)) {
526 dfsm
->mode
= new_mode
;
530 g_mutex_unlock (&dfsm
->mode_mutex
);
535 if (new_mode
== DFSM_MODE_START
) {
536 cfs_dom_message(dfsm
->log_domain
, "start cluster connection");
537 } else if (new_mode
== DFSM_MODE_START_SYNC
) {
538 cfs_dom_message(dfsm
->log_domain
, "starting data syncronisation");
539 } else if (new_mode
== DFSM_MODE_SYNCED
) {
540 cfs_dom_message(dfsm
->log_domain
, "all data is up to date");
541 if (dfsm
->dfsm_callbacks
->dfsm_synced_fn
)
542 dfsm
->dfsm_callbacks
->dfsm_synced_fn(dfsm
);
543 } else if (new_mode
== DFSM_MODE_UPDATE
) {
544 cfs_dom_message(dfsm
->log_domain
, "waiting for updates from leader");
545 } else if (new_mode
== DFSM_MODE_LEAVE
) {
546 cfs_dom_critical(dfsm
->log_domain
, "leaving CPG group");
547 } else if (new_mode
== DFSM_MODE_ERROR
) {
548 cfs_dom_critical(dfsm
->log_domain
, "serious internal error - stop cluster connection");
549 } else if (new_mode
== DFSM_MODE_VERSION_ERROR
) {
550 cfs_dom_critical(dfsm
->log_domain
, "detected newer protocol - please update this node");
555 dfsm_get_mode(dfsm_t
*dfsm
)
557 g_return_val_if_fail(dfsm
!= NULL
, DFSM_MODE_ERROR
);
559 g_mutex_lock (&dfsm
->mode_mutex
);
560 dfsm_mode_t mode
= dfsm
->mode
;
561 g_mutex_unlock (&dfsm
->mode_mutex
);
567 dfsm_restartable(dfsm_t
*dfsm
)
569 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
571 return !(mode
== DFSM_MODE_ERROR
||
572 mode
== DFSM_MODE_VERSION_ERROR
);
576 dfsm_set_errormode(dfsm_t
*dfsm
)
578 dfsm_set_mode(dfsm
, DFSM_MODE_ERROR
);
582 dfsm_release_sync_resources(
584 const struct cpg_address
*member_list
,
585 size_t member_list_entries
)
587 g_return_if_fail(dfsm
!= NULL
);
588 g_return_if_fail(dfsm
->members
!= NULL
);
589 g_return_if_fail(!member_list_entries
|| member_list
!= NULL
);
591 cfs_debug("enter dfsm_release_sync_resources");
593 if (dfsm
->sync_info
) {
595 if (dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
) {
596 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
597 dfsm
->sync_info
->data
= NULL
;
600 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
601 if (dfsm
->sync_info
->nodes
[i
].state
) {
602 g_free(dfsm
->sync_info
->nodes
[i
].state
);
603 dfsm
->sync_info
->nodes
[i
].state
= NULL
;
604 dfsm
->sync_info
->nodes
[i
].state_len
= 0;
611 g_hash_table_remove_all(dfsm
->members
);
614 g_free(dfsm
->sync_info
);
616 int size
= sizeof(dfsm_sync_info_t
) +
617 member_list_entries
*sizeof(dfsm_sync_info_t
);
618 dfsm_sync_info_t
*sync_info
= dfsm
->sync_info
= g_malloc0(size
);
619 sync_info
->node_count
= member_list_entries
;
621 for (int i
= 0; i
< member_list_entries
; i
++) {
622 sync_info
->nodes
[i
].nodeid
= member_list
[i
].nodeid
;
623 sync_info
->nodes
[i
].pid
= member_list
[i
].pid
;
626 qsort(sync_info
->nodes
, member_list_entries
, sizeof(dfsm_node_info_t
),
627 dfsm_sync_info_compare
);
629 for (int i
= 0; i
< member_list_entries
; i
++) {
630 dfsm_node_info_t
*info
= &sync_info
->nodes
[i
];
631 g_hash_table_insert(dfsm
->members
, info
, info
);
632 if (info
->nodeid
== dfsm
->nodeid
&& info
->pid
== dfsm
->pid
)
633 sync_info
->local
= info
;
639 dfsm_cpg_deliver_callback(
641 const struct cpg_name
*group_name
,
650 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
651 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
652 cfs_critical("cpg_context_get error: %d (%p)", result
, dfsm
);
653 return; /* we have no valid dfsm pointer, so we can just ignore this */
655 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
657 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
659 if (mode
>= DFSM_ERROR_MODE_START
) {
660 cfs_dom_debug(dfsm
->log_domain
, "error mode - ignoring message");
664 if (!dfsm
->sync_info
) {
665 cfs_dom_critical(dfsm
->log_domain
, "no dfsm_sync_info - internal error");
669 if (msg_len
< sizeof(dfsm_message_header_t
)) {
670 cfs_dom_critical(dfsm
->log_domain
, "received short message (%ld bytes)", msg_len
);
674 dfsm_message_header_t
*base_header
= (dfsm_message_header_t
*)msg
;
676 if (base_header
->protocol_version
> dfsm
->protocol_version
) {
677 cfs_dom_critical(dfsm
->log_domain
, "received message with protocol version %d",
678 base_header
->protocol_version
);
679 dfsm_set_mode(dfsm
, DFSM_MODE_VERSION_ERROR
);
681 } else if (base_header
->protocol_version
< dfsm
->protocol_version
) {
682 cfs_dom_message(dfsm
->log_domain
, "ignore message with wrong protocol version %d",
683 base_header
->protocol_version
);
687 if (base_header
->type
== DFSM_MESSAGE_NORMAL
) {
689 dfsm_message_normal_header_t
*header
= (dfsm_message_normal_header_t
*)msg
;
691 if (msg_len
< sizeof(dfsm_message_normal_header_t
)) {
692 cfs_dom_critical(dfsm
->log_domain
, "received short message (type = %d, subtype = %d, %ld bytes)",
693 base_header
->type
, base_header
->subtype
, msg_len
);
697 if (mode
!= DFSM_MODE_SYNCED
) {
698 cfs_dom_debug(dfsm
->log_domain
, "queue message %zu (subtype = %d, length = %ld)",
699 header
->count
, base_header
->subtype
, msg_len
);
701 if (!dfsm_queue_add_message(dfsm
, nodeid
, pid
, header
->count
, msg
, msg_len
))
706 int res
= dfsm
->dfsm_callbacks
->dfsm_deliver_fn(
707 dfsm
, dfsm
->data
, &msg_res
, nodeid
, pid
, base_header
->subtype
,
708 base_header
->time
, msg
+ sizeof(dfsm_message_normal_header_t
),
709 msg_len
- sizeof(dfsm_message_normal_header_t
));
711 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
)
712 dfsm_record_local_result(dfsm
, header
->count
, msg_res
, res
);
721 /* state related messages
722 * we needs right epoch - else we simply discard the message
725 dfsm_message_state_header_t
*header
= (dfsm_message_state_header_t
*)msg
;
727 if (msg_len
< sizeof(dfsm_message_state_header_t
)) {
728 cfs_dom_critical(dfsm
->log_domain
, "received short state message (type = %d, subtype = %d, %ld bytes)",
729 base_header
->type
, base_header
->subtype
, msg_len
);
733 if (base_header
->type
!= DFSM_MESSAGE_SYNC_START
&&
734 (memcmp(&header
->epoch
, &dfsm
->sync_epoch
, sizeof(dfsm_sync_epoch_t
)) != 0)) {
735 cfs_dom_debug(dfsm
->log_domain
, "ignore message (msg_type == %d) with "
736 "wrong epoch (epoch %d/%d/%08X)", base_header
->type
,
737 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
741 msg
+= sizeof(dfsm_message_state_header_t
);
742 msg_len
-= sizeof(dfsm_message_state_header_t
);
744 if (mode
== DFSM_MODE_SYNCED
) {
745 if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
747 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
748 dfsm
->sync_info
->nodes
[i
].synced
= 1;
750 if (!dfsm_deliver_queue(dfsm
))
755 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
) {
757 if (msg_len
!= sizeof(dfsm
->csum_counter
)) {
758 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len
, nodeid
, pid
);
762 uint64_t csum_id
= *((uint64_t *)msg
);
763 msg
+= 8; msg_len
-= 8;
765 cfs_dom_debug(dfsm
->log_domain
, "got verify request from node %d %016zX", nodeid
, csum_id
);
767 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
768 if (!dfsm
->dfsm_callbacks
->dfsm_checksum_fn(
769 dfsm
, dfsm
->data
, dfsm
->csum
, sizeof(dfsm
->csum
))) {
770 cfs_dom_critical(dfsm
->log_domain
, "unable to compute data checksum");
774 dfsm
->csum_epoch
= header
->epoch
;
775 dfsm
->csum_id
= csum_id
;
777 if (nodeid
== dfsm
->nodeid
&& pid
== dfsm
->pid
) {
778 if (!dfsm_send_checksum(dfsm
))
785 } else if (base_header
->type
== DFSM_MESSAGE_VERIFY
) {
787 cfs_dom_debug(dfsm
->log_domain
, "received verify message");
789 if (dfsm
->dfsm_callbacks
->dfsm_checksum_fn
) {
791 if (msg_len
!= (sizeof(dfsm
->csum_id
) + sizeof(dfsm
->csum
))) {
792 cfs_dom_critical(dfsm
->log_domain
, "cpg received verify message with wrong length (%ld bytes)", msg_len
);
796 uint64_t csum_id
= *((uint64_t *)msg
);
797 msg
+= 8; msg_len
-= 8;
799 if (dfsm
->csum_id
== csum_id
&&
800 (memcmp(&dfsm
->csum_epoch
, &header
->epoch
, sizeof(dfsm_sync_epoch_t
)) == 0)) {
801 if (memcmp(msg
, dfsm
->csum
, sizeof(dfsm
->csum
)) != 0) {
802 cfs_dom_critical(dfsm
->log_domain
, "wrong checksum %016zX != %016zX - restarting",
803 *(uint64_t *)msg
, *(uint64_t *)dfsm
->csum
);
806 cfs_dom_message(dfsm
->log_domain
, "data verification successful");
809 cfs_dom_message(dfsm
->log_domain
, "skip verification - no checksum saved");
816 /* ignore (we already got all required updates, or we are leader) */
817 cfs_dom_debug(dfsm
->log_domain
, "ignore state sync message %d",
822 } else if (mode
== DFSM_MODE_START_SYNC
) {
824 if (base_header
->type
== DFSM_MESSAGE_SYNC_START
) {
826 if (nodeid
!= dfsm
->lowest_nodeid
) {
827 cfs_dom_critical(dfsm
->log_domain
, "ignore sync request from wrong member %d/%d",
831 cfs_dom_message(dfsm
->log_domain
, "received sync request (epoch %d/%d/%08X)",
832 header
->epoch
.nodeid
, header
->epoch
.pid
, header
->epoch
.epoch
);
834 dfsm
->sync_epoch
= header
->epoch
;
836 dfsm_release_sync_resources(dfsm
, NULL
, 0);
838 unsigned int state_len
= 0;
839 gpointer state
= NULL
;
841 state
= dfsm
->dfsm_callbacks
->dfsm_get_state_fn(dfsm
, dfsm
->data
, &state_len
);
843 if (!(state
&& state_len
)) {
844 cfs_dom_critical(dfsm
->log_domain
, "dfsm_get_state_fn failed");
849 iov
[0].iov_base
= state
;
850 iov
[0].iov_len
= state_len
;
852 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_STATE
, iov
, 1);
862 } else if (base_header
->type
== DFSM_MESSAGE_STATE
) {
864 dfsm_node_info_t
*ni
;
866 if (!(ni
= dfsm_node_info_lookup(dfsm
, nodeid
, pid
))) {
867 cfs_dom_critical(dfsm
->log_domain
, "received state for non-member %d/%d", nodeid
, pid
);
872 cfs_dom_critical(dfsm
->log_domain
, "received duplicate state for member %d/%d", nodeid
, pid
);
876 ni
->state
= g_memdup(msg
, msg_len
);
877 ni
->state_len
= msg_len
;
879 int received_all
= 1;
880 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++) {
881 if (!dfsm
->sync_info
->nodes
[i
].state
) {
888 cfs_dom_message(dfsm
->log_domain
, "received all states");
890 int res
= dfsm
->dfsm_callbacks
->dfsm_process_state_update_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
894 if (dfsm
->sync_info
->local
->synced
) {
895 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
896 dfsm_release_sync_resources(dfsm
, NULL
, 0);
898 if (!dfsm_deliver_queue(dfsm
))
902 dfsm_set_mode(dfsm
, DFSM_MODE_UPDATE
);
904 if (!dfsm_deliver_queue(dfsm
))
913 } else if (mode
== DFSM_MODE_UPDATE
) {
915 if (base_header
->type
== DFSM_MESSAGE_UPDATE
) {
917 int res
= dfsm
->dfsm_callbacks
->dfsm_process_update_fn(
918 dfsm
, dfsm
->data
, dfsm
->sync_info
, nodeid
, pid
, msg
, msg_len
);
925 } else if (base_header
->type
== DFSM_MESSAGE_UPDATE_COMPLETE
) {
928 int res
= dfsm
->dfsm_callbacks
->dfsm_commit_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
933 for (int i
= 0; i
< dfsm
->sync_info
->node_count
; i
++)
934 dfsm
->sync_info
->nodes
[i
].synced
= 1;
936 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
938 if (!dfsm_deliver_sync_queue(dfsm
))
941 if (!dfsm_deliver_queue(dfsm
))
944 dfsm_release_sync_resources(dfsm
, NULL
, 0);
950 cfs_dom_critical(dfsm
->log_domain
, "internal error - unknown mode %d", mode
);
954 if (base_header
->type
== DFSM_MESSAGE_VERIFY_REQUEST
||
955 base_header
->type
== DFSM_MESSAGE_VERIFY
) {
957 cfs_dom_debug(dfsm
->log_domain
, "ignore verify message %d while not synced", base_header
->type
);
960 cfs_dom_critical(dfsm
->log_domain
, "received unknown state message type (type = %d, %ld bytes)",
961 base_header
->type
, msg_len
);
966 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
967 dfsm_release_sync_resources(dfsm
, NULL
, 0);
972 dfsm_resend_queue(dfsm_t
*dfsm
)
974 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
975 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
977 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
978 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
981 while (iter
!= end
) {
982 GSequenceIter
*cur
= iter
;
983 iter
= g_sequence_iter_next(iter
);
985 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
988 if (qm
->nodeid
== dfsm
->nodeid
&& qm
->pid
== dfsm
->pid
) {
991 iov
[0].iov_base
= qm
->msg
;
992 iov
[0].iov_len
= qm
->msg_len
;
994 if ((result
= dfsm_send_message_full(dfsm
, iov
, 1, 1)) != CS_OK
) {
1001 dfsm_free_message_queue(dfsm
);
1007 dfsm_deliver_sync_queue(dfsm_t
*dfsm
)
1009 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1011 if (!dfsm
->sync_queue
)
1014 gboolean res
= TRUE
;
1017 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
,
1018 g_list_length(dfsm
->sync_queue
));
1020 GList
*iter
= dfsm
->sync_queue
;
1022 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)iter
->data
;
1023 iter
= g_list_next(iter
);
1025 if (res
&& dfsm
->mode
== DFSM_MODE_SYNCED
) {
1026 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1027 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1032 dfsm_free_queue_entry(qm
);
1034 g_list_free(dfsm
->sync_queue
);
1035 dfsm
->sync_queue
= NULL
;
1041 dfsm_deliver_queue(dfsm_t
*dfsm
)
1043 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1044 g_return_val_if_fail(dfsm
->msg_queue
!= NULL
, FALSE
);
1046 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1050 GSequenceIter
*iter
= g_sequence_get_begin_iter(dfsm
->msg_queue
);
1051 GSequenceIter
*end
= g_sequence_get_end_iter(dfsm
->msg_queue
);
1052 gboolean res
= TRUE
;
1055 cfs_dom_message(dfsm
->log_domain
, "%s: queue length %d", __func__
, qlen
);
1057 while (iter
!= end
) {
1058 GSequenceIter
*cur
= iter
;
1059 iter
= g_sequence_iter_next(iter
);
1061 dfsm_queued_message_t
*qm
= (dfsm_queued_message_t
*)
1062 g_sequence_get(cur
);
1064 dfsm_node_info_t
*ni
= dfsm_node_info_lookup(dfsm
, qm
->nodeid
, qm
->pid
);
1066 cfs_dom_message(dfsm
->log_domain
, "remove message from non-member %d/%d",
1067 qm
->nodeid
, qm
->pid
);
1068 dfsm_free_queue_entry(qm
);
1069 g_sequence_remove(cur
);
1073 if (dfsm
->mode
== DFSM_MODE_SYNCED
) {
1075 dfsm_cpg_deliver_callback(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
,
1076 qm
->nodeid
, qm
->pid
, qm
->msg
, qm
->msg_len
);
1077 dfsm_free_queue_entry(qm
);
1078 g_sequence_remove(cur
);
1080 } else if (dfsm
->mode
== DFSM_MODE_UPDATE
) {
1082 dfsm
->sync_queue
= g_list_append(dfsm
->sync_queue
, qm
);
1083 g_sequence_remove(cur
);
1095 dfsm_cpg_confchg_callback(
1096 cpg_handle_t handle
,
1097 const struct cpg_name
*group_name
,
1098 const struct cpg_address
*member_list
,
1099 size_t member_list_entries
,
1100 const struct cpg_address
*left_list
,
1101 size_t left_list_entries
,
1102 const struct cpg_address
*joined_list
,
1103 size_t joined_list_entries
)
1107 dfsm_t
*dfsm
= NULL
;
1108 result
= cpg_context_get(handle
, (gpointer
*)&dfsm
);
1109 if (result
!= CS_OK
|| !dfsm
|| dfsm
->cpg_callbacks
!= &cpg_callbacks
) {
1110 cfs_critical("cpg_context_get error: %d (%p)", result
, dfsm
);
1111 return; /* we have no valid dfsm pointer, so we can just ignore this */
1114 dfsm
->we_are_member
= 0;
1116 /* create new epoch */
1117 dfsm
->local_epoch_counter
++;
1118 dfsm
->sync_epoch
.epoch
= dfsm
->local_epoch_counter
;
1119 dfsm
->sync_epoch
.nodeid
= dfsm
->nodeid
;
1120 dfsm
->sync_epoch
.pid
= dfsm
->pid
;
1121 dfsm
->sync_epoch
.time
= time(NULL
);
1123 /* invalidate saved checksum */
1124 dfsm
->csum_id
= dfsm
->csum_counter
;
1125 memset(&dfsm
->csum_epoch
, 0, sizeof(dfsm
->csum_epoch
));
1127 dfsm_free_sync_queue(dfsm
);
1129 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1131 cfs_dom_debug(dfsm
->log_domain
, "dfsm mode is %d", mode
);
1133 if (mode
>= DFSM_ERROR_MODE_START
) {
1134 cfs_dom_debug(dfsm
->log_domain
, "already left group - ignore message");
1138 int lowest_nodeid
= 0;
1139 GString
*str
= g_string_new("members: ");
1140 for (int i
= 0; i
< member_list_entries
; i
++) {
1142 g_string_append_printf(str
, i
? ", %d/%d" : "%d/%d",
1143 member_list
[i
].nodeid
, member_list
[i
].pid
);
1145 if (lowest_nodeid
== 0 || lowest_nodeid
> member_list
[i
].nodeid
)
1146 lowest_nodeid
= member_list
[i
].nodeid
;
1148 if (member_list
[i
].nodeid
== dfsm
->nodeid
&&
1149 member_list
[i
].pid
== dfsm
->pid
)
1150 dfsm
->we_are_member
= 1;
1154 if ((dfsm
->we_are_member
|| mode
!= DFSM_MODE_START
))
1155 cfs_dom_message(dfsm
->log_domain
, str
->str
);
1157 g_string_free(str
, 1);
1159 dfsm
->lowest_nodeid
= lowest_nodeid
;
1161 /* NOTE: one node can be in left and joined list at the same time,
1162 so it is better to query member list. Also JOIN/LEAVE list are
1163 different on different nodes!
1166 dfsm_release_sync_resources(dfsm
, member_list
, member_list_entries
);
1168 if (!dfsm
->we_are_member
) {
1169 if (mode
== DFSM_MODE_START
) {
1170 cfs_dom_debug(dfsm
->log_domain
, "ignore leave message");
1173 cfs_dom_message(dfsm
->log_domain
, "we (%d/%d) left the process group",
1174 dfsm
->nodeid
, dfsm
->pid
);
1178 if (member_list_entries
> 1) {
1180 int qlen
= g_sequence_get_length(dfsm
->msg_queue
);
1181 if (joined_list_entries
&& qlen
) {
1182 /* we need to make sure that all members have the same queue. */
1183 cfs_dom_message(dfsm
->log_domain
, "queue not emtpy - resening %d messages", qlen
);
1184 if (!dfsm_resend_queue(dfsm
)) {
1185 cfs_dom_critical(dfsm
->log_domain
, "dfsm_resend_queue failed");
1190 dfsm_set_mode(dfsm
, DFSM_MODE_START_SYNC
);
1191 if (lowest_nodeid
== dfsm
->nodeid
) {
1192 if (!dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_SYNC_START
, NULL
, 0)) {
1193 cfs_dom_critical(dfsm
->log_domain
, "failed to send SYNC_START message");
1198 dfsm_set_mode(dfsm
, DFSM_MODE_SYNCED
);
1199 dfsm
->sync_info
->local
->synced
= 1;
1200 if (!dfsm_deliver_queue(dfsm
))
1204 if (dfsm
->dfsm_callbacks
->dfsm_confchg_fn
)
1205 dfsm
->dfsm_callbacks
->dfsm_confchg_fn(dfsm
, dfsm
->data
, member_list
, member_list_entries
);
1209 dfsm_set_mode(dfsm
, DFSM_MODE_LEAVE
);
1213 static cpg_callbacks_t cpg_callbacks
= {
1214 .cpg_deliver_fn
= dfsm_cpg_deliver_callback
,
1215 .cpg_confchg_fn
= dfsm_cpg_confchg_callback
,
1221 const char *group_name
,
1222 const char *log_domain
,
1223 guint32 protocol_version
,
1224 dfsm_callbacks_t
*callbacks
)
1226 g_return_val_if_fail(sizeof(dfsm_message_header_t
) == 16, NULL
);
1227 g_return_val_if_fail(sizeof(dfsm_message_state_header_t
) == 32, NULL
);
1228 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t
) == 24, NULL
);
1230 g_return_val_if_fail(callbacks
!= NULL
, NULL
);
1231 g_return_val_if_fail(callbacks
->dfsm_deliver_fn
!= NULL
, NULL
);
1233 g_return_val_if_fail(callbacks
->dfsm_get_state_fn
!= NULL
, NULL
);
1234 g_return_val_if_fail(callbacks
->dfsm_process_state_update_fn
!= NULL
, NULL
);
1235 g_return_val_if_fail(callbacks
->dfsm_process_update_fn
!= NULL
, NULL
);
1236 g_return_val_if_fail(callbacks
->dfsm_commit_fn
!= NULL
, NULL
);
1240 if ((dfsm
= g_new0(dfsm_t
, 1)) == NULL
)
1243 g_mutex_init(&dfsm
->sync_mutex
);
1245 g_cond_init(&dfsm
->sync_cond
);
1247 if (!(dfsm
->results
= g_hash_table_new(g_int64_hash
, g_int64_equal
)))
1250 if (!(dfsm
->msg_queue
= g_sequence_new(NULL
)))
1253 dfsm
->log_domain
= log_domain
;
1255 dfsm
->mode
= DFSM_MODE_START
;
1256 dfsm
->protocol_version
= protocol_version
;
1257 strcpy (dfsm
->cpg_group_name
.value
, group_name
);
1258 dfsm
->cpg_group_name
.length
= strlen (group_name
) + 1;
1260 dfsm
->cpg_callbacks
= &cpg_callbacks
;
1261 dfsm
->dfsm_callbacks
= callbacks
;
1263 dfsm
->members
= g_hash_table_new(dfsm_sync_info_hash
, dfsm_sync_info_equal
);
1267 g_mutex_init(&dfsm
->mode_mutex
);
1277 dfsm_lowest_nodeid(dfsm_t
*dfsm
)
1279 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1281 if (dfsm
->lowest_nodeid
&& (dfsm
->lowest_nodeid
== dfsm
->nodeid
))
1288 dfsm_verify_request(dfsm_t
*dfsm
)
1290 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1292 /* only do when we have lowest nodeid */
1293 if (!dfsm
->lowest_nodeid
|| (dfsm
->lowest_nodeid
!= dfsm
->nodeid
))
1296 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1297 if (mode
!= DFSM_MODE_SYNCED
)
1301 struct iovec iov
[len
];
1303 if (dfsm
->csum_counter
!= dfsm
->csum_id
) {
1304 g_message("delay verify request %016zX", dfsm
->csum_counter
+ 1);
1308 dfsm
->csum_counter
++;
1309 iov
[0].iov_base
= (char *)&dfsm
->csum_counter
;
1310 iov
[0].iov_len
= sizeof(dfsm
->csum_counter
);
1312 cfs_debug("send verify request %016zX", dfsm
->csum_counter
);
1315 result
= dfsm_send_state_message_full(dfsm
, DFSM_MESSAGE_VERIFY_REQUEST
, iov
, len
);
1317 if (result
!= CS_OK
)
1318 cfs_dom_critical(dfsm
->log_domain
, "failed to send VERIFY_REQUEST message");
1327 cs_dispatch_flags_t dispatch_types
)
1329 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1330 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_INVALID_PARAM
);
1334 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1337 result
= cpg_dispatch(dfsm
->cpg_handle
, dispatch_types
);
1338 if (result
== CS_ERR_TRY_AGAIN
) {
1339 nanosleep(&tvreq
, NULL
);
1341 if ((retries
% 10) == 0)
1342 cfs_dom_message(dfsm
->log_domain
, "cpg_dispatch retry %d", retries
);
1346 if (!(result
== CS_OK
|| result
== CS_ERR_TRY_AGAIN
)) {
1347 cfs_dom_critical(dfsm
->log_domain
, "cpg_dispatch failed: %d", result
);
1355 dfsm_initialize(dfsm_t
*dfsm
, int *fd
)
1357 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1358 g_return_val_if_fail(fd
!= NULL
, CS_ERR_INVALID_PARAM
);
1360 /* remove old messages */
1361 dfsm_free_message_queue(dfsm
);
1362 dfsm_send_sync_message_abort(dfsm
);
1364 dfsm
->joined
= FALSE
;
1365 dfsm
->we_are_member
= 0;
1367 dfsm_set_mode(dfsm
, DFSM_MODE_START
);
1371 if (dfsm
->cpg_handle
== 0) {
1372 if ((result
= cpg_initialize(&dfsm
->cpg_handle
, dfsm
->cpg_callbacks
)) != CS_OK
) {
1373 cfs_dom_critical(dfsm
->log_domain
, "cpg_initialize failed: %d", result
);
1374 goto err_no_finalize
;
1377 if ((result
= cpg_local_get(dfsm
->cpg_handle
, &dfsm
->nodeid
)) != CS_OK
) {
1378 cfs_dom_critical(dfsm
->log_domain
, "cpg_local_get failed: %d", result
);
1382 dfsm
->pid
= getpid();
1384 result
= cpg_context_set(dfsm
->cpg_handle
, dfsm
);
1385 if (result
!= CS_OK
) {
1386 cfs_dom_critical(dfsm
->log_domain
, "cpg_context_set failed: %d", result
);
1391 result
= cpg_fd_get(dfsm
->cpg_handle
, fd
);
1392 if (result
!= CS_OK
) {
1393 cfs_dom_critical(dfsm
->log_domain
, "cpg_fd_get failed: %d", result
);
1400 cpg_finalize(dfsm
->cpg_handle
);
1402 dfsm
->cpg_handle
= 0;
1407 dfsm_join(dfsm_t
*dfsm
)
1409 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1410 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, CS_ERR_LIBRARY
);
1411 g_return_val_if_fail(dfsm
->joined
== 0, CS_ERR_EXIST
);
1415 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1418 result
= cpg_join(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1419 if (result
== CS_ERR_TRY_AGAIN
) {
1420 nanosleep(&tvreq
, NULL
);
1422 if ((retries
% 10) == 0)
1423 cfs_dom_message(dfsm
->log_domain
, "cpg_join retry %d", retries
);
1427 if (result
!= CS_OK
) {
1428 cfs_dom_critical(dfsm
->log_domain
, "cpg_join failed: %d", result
);
1432 dfsm
->joined
= TRUE
;
1437 dfsm_leave (dfsm_t
*dfsm
)
1439 g_return_val_if_fail(dfsm
!= NULL
, CS_ERR_INVALID_PARAM
);
1440 g_return_val_if_fail(dfsm
->joined
, CS_ERR_NOT_EXIST
);
1444 struct timespec tvreq
= { .tv_sec
= 0, .tv_nsec
= 100000000 };
1447 result
= cpg_leave(dfsm
->cpg_handle
, &dfsm
->cpg_group_name
);
1448 if (result
== CS_ERR_TRY_AGAIN
) {
1449 nanosleep(&tvreq
, NULL
);
1451 if ((retries
% 10) == 0)
1452 cfs_dom_message(dfsm
->log_domain
, "cpg_leave retry %d", retries
);
1456 if (result
!= CS_OK
) {
1457 cfs_dom_critical(dfsm
->log_domain
, "cpg_leave failed: %d", result
);
1461 dfsm
->joined
= FALSE
;
1467 dfsm_finalize(dfsm_t
*dfsm
)
1469 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1471 dfsm_send_sync_message_abort(dfsm
);
1476 if (dfsm
->cpg_handle
) {
1477 cpg_finalize(dfsm
->cpg_handle
);
1478 dfsm
->cpg_handle
= 0;
1479 dfsm
->joined
= FALSE
;
1480 dfsm
->we_are_member
= 0;
1487 dfsm_destroy(dfsm_t
*dfsm
)
1489 g_return_if_fail(dfsm
!= NULL
);
1491 dfsm_finalize(dfsm
);
1493 if (dfsm
->sync_info
&& dfsm
->sync_info
->data
&& dfsm
->dfsm_callbacks
->dfsm_cleanup_fn
)
1494 dfsm
->dfsm_callbacks
->dfsm_cleanup_fn(dfsm
, dfsm
->data
, dfsm
->sync_info
);
1496 dfsm_free_sync_queue(dfsm
);
1498 g_mutex_clear (&dfsm
->mode_mutex
);
1500 g_mutex_clear (&dfsm
->sync_mutex
);
1502 g_cond_clear (&dfsm
->sync_cond
);
1505 g_hash_table_destroy(dfsm
->results
);
1507 if (dfsm
->msg_queue
) {
1508 dfsm_free_message_queue(dfsm
);
1509 g_sequence_free(dfsm
->msg_queue
);
1512 if (dfsm
->sync_info
)
1513 g_free(dfsm
->sync_info
);
1515 if (dfsm
->cpg_handle
)
1516 cpg_finalize(dfsm
->cpg_handle
);
1519 g_hash_table_destroy(dfsm
->members
);
1526 } service_dfsm_private_t
;
1529 service_dfsm_finalize(
1530 cfs_service_t
*service
,
1533 g_return_val_if_fail(service
!= NULL
, FALSE
);
1534 g_return_val_if_fail(context
!= NULL
, FALSE
);
1536 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1537 dfsm_t
*dfsm
= private->dfsm
;
1539 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1541 return dfsm_finalize(dfsm
);
1545 service_dfsm_initialize(
1546 cfs_service_t
*service
,
1549 g_return_val_if_fail(service
!= NULL
, -1);
1550 g_return_val_if_fail(context
!= NULL
, -1);
1552 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1553 dfsm_t
*dfsm
= private->dfsm
;
1555 g_return_val_if_fail(dfsm
!= NULL
, -1);
1557 /* serious internal error - don't try to recover */
1558 if (!dfsm_restartable(dfsm
))
1564 if ((result
= dfsm_initialize(dfsm
, &fd
)) != CS_OK
)
1567 result
= dfsm_join(dfsm
);
1568 if (result
!= CS_OK
) {
1569 /* we can't dispatch if not joined, so we need to finalize */
1570 dfsm_finalize(dfsm
);
1578 service_dfsm_dispatch(
1579 cfs_service_t
*service
,
1582 g_return_val_if_fail(service
!= NULL
, FALSE
);
1583 g_return_val_if_fail(context
!= NULL
, FALSE
);
1585 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1586 dfsm_t
*dfsm
= private->dfsm
;
1588 g_return_val_if_fail(dfsm
!= NULL
, FALSE
);
1589 g_return_val_if_fail(dfsm
->cpg_handle
!= 0, FALSE
);
1593 result
= dfsm_dispatch(dfsm
, CS_DISPATCH_ONE
);
1594 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1596 if (result
!= CS_OK
)
1599 dfsm_mode_t mode
= dfsm_get_mode(dfsm
);
1600 if (mode
>= DFSM_ERROR_MODE_START
) {
1602 result
= dfsm_leave(dfsm
);
1603 if (result
== CS_ERR_LIBRARY
|| result
== CS_ERR_BAD_HANDLE
)
1605 if (result
!= CS_OK
)
1608 if (!dfsm
->we_are_member
)
1616 dfsm_finalize(dfsm
);
1618 cfs_service_set_restartable(service
, dfsm_restartable(dfsm
));
1624 cfs_service_t
*service
,
1627 g_return_if_fail(service
!= NULL
);
1628 g_return_if_fail(context
!= NULL
);
1630 service_dfsm_private_t
*private = (service_dfsm_private_t
*)context
;
1631 dfsm_t
*dfsm
= private->dfsm
;
1633 g_return_if_fail(dfsm
!= NULL
);
1635 dfsm_verify_request(dfsm
);
1638 static cfs_service_callbacks_t cfs_dfsm_callbacks
= {
1639 .cfs_service_initialize_fn
= service_dfsm_initialize
,
1640 .cfs_service_finalize_fn
= service_dfsm_finalize
,
1641 .cfs_service_dispatch_fn
= service_dfsm_dispatch
,
1642 .cfs_service_timer_fn
= service_dfsm_timer
,
1646 service_dfsm_new(dfsm_t
*dfsm
)
1648 cfs_service_t
*service
;
1650 g_return_val_if_fail(dfsm
!= NULL
, NULL
);
1652 service_dfsm_private_t
*private = g_new0(service_dfsm_private_t
, 1);
1656 private->dfsm
= dfsm
;
1658 service
= cfs_service_new(&cfs_dfsm_callbacks
, dfsm
->log_domain
, private);
1664 service_dfsm_destroy(cfs_service_t
*service
)
1666 g_return_if_fail(service
!= NULL
);
1668 service_dfsm_private_t
*private =
1669 (service_dfsm_private_t
*)cfs_service_get_context(service
);